写点什么

闲鱼端侧如何实现实时 CEP 引擎

  • 2020-02-16
  • 本文字数:5487 字

    阅读完需:约 18 分钟

闲鱼端侧如何实现实时CEP引擎

闲鱼端侧如何实现实时 CEP 引擎

背景

用户来闲鱼,主要是为了获得自己关心的内容。随着闲鱼的体量越来越大,内容也变得越来越丰富。闲鱼基于用户画像,可以将用户关心的内容推送给用户。具体在哪些场景下才需要触发推送?我们定义了很多触发规则,包括停留时长、点击路径等。起初我们把触发规则的逻辑放在服务端(Blink)运行。但实践下来发现 Blink 存在诸多限制:


  • 服务端要对客户端埋点进行数据清洗,考虑到闲鱼的 DAU 已经突破 2000w,这个量是非常庞大的,非常消耗服务端资源;

  • Blink 的策略是实时执行的,同样因为资源问题,现在只能同时上线十几个策略。


如何解决这些问题呢,我们开始考虑能否将 Blink 的策略跑在客户端!

CEP 模型

Blink,作为是 Flink 的一个分支,最初是阿里巴巴内部创建的,针对 Flink 进行了改进,所以我们这里还是围绕 Flink 讨论。CEP(Complex Event Process)是 Flink 中的一个子库,用来快速检测无尽数据流中的复杂模式。

Flink CEP

Flink 的 CEP 的核心是 NFA(Non-determined Finite Automaton),全称叫不确定的有限状态机。提到 NFA,就不得不提 Jagrati Agrawal 等撰写的关于 NFA 模型的论文《Efficient Pattern Matching over Event Streams》,本篇论文中描述了 NFA 的匹配原理。



上面这张图,就是一个不确定的有限状态机,它由状态(State)还有之间的连线(StateTransition)组成的。


  • 状态(State):状态是根据 flink 脚本里面的代码来决定的,最终会有一个 $end$的 Final 状态

  • 转换(StateTransition):State 的转换条件,包括 take/proceed/ignore


不同的条件,代表的含义不同:


  • take: 满足条件,获取当前元素,进入下一状态

  • proceed:不论是否满足条件,不获取当前元素,直接进入下状态(如 optional)并进行判断是否满足条件。

  • ignore:不满足条件,忽略,进入下一状态。


我们只要在端上实现这样一个状态机,就可以实现一个 CEP 引擎。

Python CEP

对于客户端来说,首先要解决的问题是如何构建一个 CEP 环境。经过调研,可以复用集团的端智能容器(Walle),作为 Python 容器可以执行 cep 的策略。


在构建 NFA 之前,首先要解决的一个问题是数据来源,手淘信息流团队有一套完整的解决方案 BehaviX/BehaviR,可以对 UT 埋点进行结构化,能很好的结合 Walle 容器来触发策略。有了事件来源,还需要解决的是 Python 脚本如何执行。Walle 平台可以将多个 Python 脚本打包下载并执行,因此,我们可以将 CEP 封装成一个 Python 的库,然后跟策略脚本一起下发。


最终的整体架构设计如下图所示:



本文重点介绍下如何用 Python 来实现一个 CEP 的编译器,这个编译器主要用来将 CEP 的描述语言转换成为 NFA。

编译器原理

在 Flink 中,java 侧会有一套完善的 API 来编写一个策略脚本,《efficient Pattern Matching over Event Streams》论文中还定义了一套完备的 DSL 描述语言,也是会转化成 java 文件去调用这些 API 去完成匹配。那么接下来会重点讨论,flink 是如何将上述 API 转化成 NFA 去匹配,以及 Python CEP 如何实现上述一套完整 API 接口。

Pattern

在 Flink 里面,是通过 Pattern来构建这个 NFA,首先用它描述这个不确定性状态机。首先是构建一个 Pattern的一个链表,得到这个链表之后,会将每个 Pattern 映射成为 State的图,点与点之间会通过 StateTransition来连接。以下面的 Python 代码为例,看下如何 API 是如何工作的:


例如,需要创建这样一个规则,描述如下:


以 start 事件开始,后续跟随一个 middle 的事件,后面紧跟着一个 end 事件作为结尾


用 Pattern 编写如下所示:


.followed_by('middle').where(SimpleCondition())\.next_('end').where(SimpleCondition())
复制代码


这个代码里面声明了 3 个 Pattern,依次命名为 startmiddleend。Pattern 里面保存了指向前面节点的引用 previous,整个 Pattern 链表构建完如下图所示:



最终拿到的是 end节点的一个引用 Ref,Pattern 中会有一个变量指向前一个节点,这样就可以得到一个 Pattern 的反向链表。


Pattern 的对外接口定义如下:


# 静态方法,用来生成起始的pattern@staticmethoddef begin(self, name):pass# 标记紧接着的事件def followed_by(self, name):pass# 标记不需要紧跟的事件def not_followed_by(self, name):pass# 标记紧跟的事件def next_(self, name):pass# 标记事件循环次数def times(self, times):pass# 标记当前事件触发的条件def where(self, condition):pass# 标记当前事件的and条件def and_(self, condition):pass# 标记当前事件的or条件def or_(self, condition):pass# 用于聚合def group_by(self, fields):pass# 用于聚合,渠道特定字段的值def fields(self, key_by_state_name, field):pass# 用于聚合,统计事件具体的数量def count(self, field, condition):pass
复制代码


不同接口会生成不同的消费策略的节点,具体细节可以参考 StateTransition。有了 Pattern 链表,接下来就需要编译器(Compiler)了,它主要是将 Pattern 链表转化成 NFA 图,首先来看下 NFA 的 2 个核心组件:StateStateTransition

State

结构定义如下:



def __init__(self, name, state_type): self.__name = name # 节点的名称,同Pattern的名称 self.__state_type = state_type # 节点的类型:Start/Normal/Stop/Final self.__state_transitions = [] # 到其他节点的边
复制代码


State 一共有 4 种类型:Start/Final/Normal/Stop


生成 NFA 的过程就是将反向解析 Pattern 链表的过程,大概的过程如下:


  1. 创建一个 $end$的结束节点( Final

  2. 再从后往前创建每个 state 节点,作为中间节点( Normal/Stop

  3. 最后创建一个开始节点( Start)State 的名称就是 Pattern 的节点名称,创建完成之后如下图所示。


Transition

State 代表了当前状态机的状态,不同状态之前的切换定义成 StateTransition


结构定义如下:



def __init__(self, source_state, action, target_state, condition): self.__source_state = source_state # 开始的State节点 self.__action = action # 具体action类型:take/ignore/proceed self.__target_state = target_state # 结束的State节点 self.__condition = condition # 节点之间的判断条件
复制代码


边的生成逻辑跟 Pattern 的事件消费策略相关,以下是事件消费策略:


    STRICT = 0# 严格匹配下个    SKIP_TILL_NEXT = 1# 跳过下一个    SKIP_TILL_ANY = 2# 跳过任意一个    NOT_FOLLOW = 3# 非跟随模式    NOT_NEXT = 4# 非紧邻模式
复制代码


不同的消费策略,得到的状态机如下图所示:



  • STRICT: 如果命中了事件了,会进到下个状态

  • SKIP_TILL_NEXT: 如果命中了会进入下一个状态,否则会再当前节点循环,进入 ignore 的边

  • SKIP_TILL_ANY: 不管是否命中条件,都会一直在当前状态循环

  • NOT_FOLLOW: 如果遇到了一个匹配的,就会进入 Stop 状态

  • NOT_NEXT: 如果命中一条,则进入 Stop 状态


在 Pattern 中,不同的接口会创建出不同的消费策略节点,例如 followed_by接口会创建 SKIP_TILL_NEXT的节点。

Times

如果有的规则,要求特定的事件,循环出现几次,那现在就要用到 times 接口。比如浏览 3 次宝贝这个规则,规则就可以写成:


SimpleCondition
复制代码


最终就会得到一个 Times=3的 Pattern,编译器在拿到这个 Pattern 之后,一样先创建一个的 Final 节点,在处理 times 的时候,会创建重复的节点,只不过名称不同,不同的点之间用 take 链接起来,如下图所示:


Python CEP 聚合

Flink 是通过 InputStream 将匹配的事件转移给 CEPOperator,执行聚合操作;但是在客户端的聚合,一次执行就一个事件流,所以可以将聚合简化到一次匹配过程中,因此我们对于 Flink 的聚合操作做了改造,使其更适合端上的场景。


那么聚合的脚本写法如下:


.followed_by('middle').where(SimpleCondition())\.next_('end').where(self.end_filter)\.group_by('group_by').fields('start', 'userId')
复制代码


这里声明了,以 start节点中的 userId作为聚合的节点,我们就会得到如下的 Pattern链表:



在解析 group_by节点的时候,我们需要做个特殊处理,判断如果有聚合节点,我们就需要再 $end$节点和前面节点之间插入一个聚合的节点和哨兵位节点,哨兵位节点命名为 $aggregationStartState$,最终效果如下图所示:



在 NFA 匹配的过程中,当匹配结束,就可以将匹配到的事件流,传到聚合节点,再进一步聚合。$aggregationStartState$节点和 group_by节点之间,是通过 proceed 结合,不需要满足特定条件就可以执行。


具体的实现过程如下,可见与 Flink 不同的是,我们创建了一个特殊的 State节点 AggregationState


def __create_aggregation_state(self, sink_state):# 渠道聚合节点的condition    _aggregation_condition = self.__current_pattern.get_aggregation_condition()
# 创建AggregationState not_next = AggregationState( self.__current_pattern.get_name(),StateType.Normal, _aggregation_condition.get_key_by_state_name(), _aggregation_condition.get_field()) self.__states.append(not_next)
# 获取take的条件 take_condition = self.__get_take_condition(self.__current_pattern) not_next.add_take(sink_state, take_condition)
# 将游标指向上一个节点 self.__following_pattern = self.__current_pattern self.__current_pattern = self.__current_pattern.get_previous()
return not_nex
复制代码

Show me the code

讲了太多原理的东西,接下来看下代码里面如何工作的,先来看下如何来编写一个 CEP 策略。

策略脚本

现在看下如何写一个完整的 python 版本的 cep 规则,以宝贝详情页为例,规则描述如下:


需要匹配用户查看 3 次宝贝详情页


那规则的写法如下:


_pattern = Pattern.begin('e1').where(KVCondition('scene', 'Page_xyItemDetail')).times(3)
# 2. 将需要匹配的事件流_batch_data和待匹配的Pattern# CEP内部会先将pattern转化成NFA,然后再用NFA去匹配事件流_cep = CEP.pattern(_batch_data['eventSeq'], _pattern)
# 用来选择的逻辑def select_function(data):pass
# 3. 匹配完成,通过cep的select接口查询匹配到的结果self.result = _cep.select(select_function)
复制代码


CEP.pattern()函数里面,会先创建 NFA,然后去进行匹配,可见整个匹配策略脚本非常的短小精悍。

生成 NFA

如下代码用来将 Pattern链表转化成 NFA图:


if self.__current_pattern.get_quantifier().get_consuming_strategy() == ConsumingStrategy.NOT_FOLLOW:raiseException('NotFollowedBy is not supported as a last part of a Pattern!')# 校验Pattern的名称,必须唯一self.__check_pattern_name_uniqueness()# 校验Pattern的策略self.__check_pattern_skip_strategy()# 首先创建Final节点sink_state = self.__create_ending_state()# 判定是否有聚合节点if self.__current_pattern.get_aggregation_condition() isnotNone:# 首先创建聚合节点    sink_state = self.__create_aggregation_state(sink_state)# 然后创建聚合几点的起始节点    sink_state = self.__create_aggregation_start_state(sink_state)# 创建状态机中的中间节点,此函数会循环知道Start节点的Patternsink_state = self.__create_middle_states(sink_state)# 最后创建Start节点self.__create_start_state(sink_state)# 根据state列表和window来创建NFAreturn NFA(self.__states, self.__window_time, False)
复制代码

效果

闲鱼已经上了几个策略,整体看来比较稳定,不过还有很多优化的空间。从实测效果来看,端侧从触发策略到执行 Action 用时不会超过 1s,其中还包含了一次网络请求的时间。

性能数据

  • 执行时间



单个脚本,执行时间大概在 100ms 左右。


  • 内存使用



现在内存使用峰值还是比较高,大概在 15M 左右。关于内存过大的问题,目前正在讨论一个方案:Python CEP 可以持久化当前 NFA 的状态,然后再触发策略的时候,只带从未触发过的事件流,避免很多重复计算。之前运行一次脚本要处理 500 个事件,现在可能就缩减到 100 之内,可以极大的减小内存消耗。同时带来另外一个问题,就是执行脚本的都会有一个 IO 操作,耗时会增加。

Flink 与客户端对比

现在对于 Flink 和客户端 Python CEP 做一个简单的对比:



相比 Flink,端侧 CEP 还是有它的优势,在端侧可以直接利用客户端的埋点信息进行计算,运行时长缩减了 80%,而且也支持动态发布。Python 脚本支持 2 端通投,在保证 2 端埋点一致的前提下,也极大的减少了维护成本。

未来

现在端计算还存在很多待优化的地方:


  1. 端计算是用 Python 实现,无法做到像 Flink 的状态机常驻内存,每次都要重新创建匹配,带来了额外的消耗

  2. 在事件流的清洗上面,现在是通过回朔拿到之前的事件流,存在大量的重复计算,后续可以借鉴 Flink 的 Window 机制来进行优化。

  3. 目前编译器暂时还不支持 Group Pattern,后续还要对其进行扩展。

  4. Python 脚本现在还是需要手动编写,后续还可以考虑通过 DSL 来自动生成。


整体看来,Python 脚本执行策略还是有一定的性能损耗,不管是在创建 NFA 或者是匹配过程,后续可以考虑将匹配引擎用 C++实现,然后真正做到常驻内存,从而做到高效的执行效率。后期做到 NFA 持久化之后,C++也可以复用整套持久化协议,从而优化整个引擎的执行效率。除此之外,策略在执行的过程中,还可以考虑用 TensorFlowLite 优化参数策略参数,从而真正做到千人前面的策略。


本文转载自公众号闲鱼技术(ID:XYtech_Alibaba)。


原文链接


https://mp.weixin.qq.com/s/_jPQvX3aiKp4ScnqgHgyow


2020-02-16 10:002818

评论

发布
暂无评论
发现更多内容

人工智能 | 无人驾驶汽车:道路安全和效率的未来

测吧(北京)科技有限公司

测试

人工智能 | 自动驾驶与城市规划

测吧(北京)科技有限公司

测试

Mybatis in Springboot

Kevin_913

Java mybatis

大模型训练:文本分类的未来之路

百度开发者中心

大模型训练 #人工智能

实现语言理解与生成的新前沿

百度开发者中心

大模型训练 #人工智能 LLM

人工智能 | 自动驾驶汽车的未来:道路安全与出行革命

测吧(北京)科技有限公司

测试

城市交通的未来服务:迎接便捷、可持续和智能的交通体验

测吧(北京)科技有限公司

测试

ABAQUS常用功能总结,推荐给自学的朋友们!

思茂信息

abaqus abaqus软件 abaqus有限元仿真

人工智能—走向智能化生活

测吧(北京)科技有限公司

测试

人工智能 | AI驾驶系统:改变汽车产业的革命

测吧(北京)科技有限公司

测试

人工智能 | 机器学习与自动驾驶

测吧(北京)科技有限公司

测试

人工智能 | 革命性的物流与运输

测吧(北京)科技有限公司

测试

Lightroom Classic for mac(Lrc2021) 10.3激活版

mac

LRC2022 照片编辑软件 苹果mac Windows软件 Lightroom Classic 2022

如何有效的给出反馈(二)

ShineScrum捷行

反馈 敏捷教练 敏捷教练引导 高管

优化模型之“平均检出率”

矩视智能

深度学习 机器视觉

人工智能 | 自动驾驶与卫星导航

测吧(北京)科技有限公司

测试

EazyDraw for mac(矢量图绘制软件) 11.2.0中文直装版

mac

苹果mac Windows软件 矢量绘图软件 EazyDraw

预训练模型在NLP中的应用与优化

百度开发者中心

自然语言处理 大模型训练 #人工智能

七张图解锁Mybatis整体脉络,让你轻松拿捏面试官

小小怪下士

Java 程序员 mybatis

一文教你如何发挥好 TDengine Grafana 插件作用

TDengine

时序数据库 ​TDengine

深度学习与预训练语言的突破

百度开发者中心

自然语言处理 大模型训练 人工智能「

人工智能 | 自动驾驶的伦理挑战

测吧(北京)科技有限公司

测试

华为云CodeArts Check代码检查插件(CodeArts IDE本地版本)使用指南

华为云PaaS服务小智

云计算 软件开发 华为云 代码检查

🔥🔥🔥序幕:AIGC 进入“平民化”时代

石头爱分享

AI openai AIGC GPT-4 MidJourney

用友招聘云助力中企出海,充盈全球化人才蓄水池

用友BIP

招聘 中企出海

人工智能 | 自动驾驶与环境保护

测吧(北京)科技有限公司

测试

强化学习与深度学习相结合的新趋势

百度开发者中心

#人工智能 LLM 大语言模型

人工智能 | 自动驾驶技术:实现未来的智能交通

测吧(北京)科技有限公司

测试

腾讯云大数据ES:一文秒懂!使用Elasticsearch进行数据分析

腾讯云大数据

elasticsearch

中企全球化案例-能源业:“1+2+3+N”,建设全球领先的智慧司库平台

用友BIP

全球司库 中企出海

人工智能 | AI驾驶系统的安全性

测吧(北京)科技有限公司

测试

闲鱼端侧如何实现实时CEP引擎_容器_景松_InfoQ精选文章