抖音技术能力大揭密!钜惠大礼、深度体验,尽在火山引擎增长沙龙,就等你来! 立即报名>> 了解详情
写点什么

复杂业务下,我们为何选择 Akka 作为异步通信框架?

2019 年 1 月 29 日

复杂业务下,我们为何选择Akka作为异步通信框架?

Akka 是 Scala 语言实现的一套基于 Actor 模型的异步通信框架,可用于构建高并发、分布式、可容错、事件驱动的基于 JVM 的应用,在 Spark 中曾被用于实现进程、节点间通信,在实际项目中协助我们成功搭建了满足业务需求的模型部署平台。


项目背景

某国内大型连锁餐饮企业旗下拥有大量门店。餐厅门店的每日生产、订货、排班都依赖于每日客单量预估的合理性,其内部数据团队实现了一套预估模型,需要 TalkingData 帮助构建一个工程化平台以支撑模型的训练和部署,从而将模型真正地应用到实际生产环节中。


经过交流,我们发现在实际生产环境中,在各方面存在一些问题:


  • 异步:所有门店的前日销售、业务等数据均由各自门店的店长负责整合上传。上传的开始时间、结束时间、数据的完整性等均不确定。而模型训练和预测均依赖这部分数据,这就意味这无法为模型训练和预测设置统一的开始入口。

  • 高并发:除了一些特殊类型的门店,绝大多数门店的营业时间相对固定,从店长决定整理上传销售数据,到准备物料、排班准备次日营业,留给模型训练和模型预测回吐预测结果的时间大概为 3 小时。如果每个门店的预测指标有 2 至 3 项,那么需要有足够的调度能力在规定时间内完成大概 2 万次模型训练加预测流程。

  • 容错:由于门店数量众多且情况各不相同,仍然有很多潜在的因素可能导致流程出错或失败。原则上,某次流程的失败不应该对其他流程造成任何影响,每个流程在平台层面应该成为互相独立的任务。


因此,我们需要一套轻量化的分布式服务框架,来实现满足上述需求的模型训练预测平台,并在一定程度上保证平台的可拓展性。结合此前团队内的技术积累,最终选择了 Akka 框架用于实现平台的内部通信。


选型过程

消息驱动方式——流程异步化

一次完整的预测任务包括:训练数据准备→模型训练→模型结果导出→预测数据准备→预测结果导出,其中数据准备步骤在时间上不确定,模型相关步骤在执行结果上不确定,如果采用同步模型,将会产生大量的等待线程,占用浪费大量资源。在 Actor 模型中,每个 Actor 作为一个基本计算单元,回应接收到的消息,同时并行的:


  • 发送有限数量的消息给其他 Actor

  • 创建有限数量的新 Actor

  • 指定接受到下一个消息时的行为


上述操作没有顺序执行的假设,因此可以并行进行。发送者与已经发送的消息解耦,可以进行无需等待的异步通信。



Actor 模型通信方式


Akka 中的 Actor 本质上就是接收消息并采取行动处理消息的对象,是封装状态和行为的对象,它们唯一的通信方式是交换消息——把消息存放在接收方的邮箱里。Actor 自然形成树形结构,这种结构的精髓在于任务被拆开、委托,直到任务小到可以被完整地处理。因此,我们将预测任务的各个步骤拆分抽象,并创建类型消息与步骤对应,将每个步骤交给线程级别的 Actor 执行处理,通过发送不同类型的消息来触发创建不同操作的 Actor,让整个预测流程无需等待。


结构——应对高并发

由于绝大多数门店的营业时间大致相同,平台在流量上会有明显的峰值和低谷,在低谷期间平台需要尽可能减少资源占有量,而在流量峰值来临时平台要能够及时响应,保证足够的可用性。


经过讨论,我们确定了采用 Master-Worker 模式的平台结构,Master 负责接收与分配任务,Worker 负责处理执行具体的模型任务。


Master 和 Worker 均为独立的 ActorSystem,管理内部不不同操作逻辑的 Actor,在空闲状态下占有资源很小。Actor 为线程级别,同样仅占用极少量资源,生命周期由 ActorSystem 统一管理。少量请求时,Actor 线程具有很高的复用率,请求并发高时,ActorSystem 会创建大量的 Actor 线程用来承接请求,保证可用性。



Akka 中 Actor 的生命周期


子 Actor——模块化提高容错

每个预测任务的模型相关步骤均存在失败的可能性,此外,数据准备过程中的网络波动、内容校验出错等情况,都会导致当前预测任务的失败。对于失败的任务,我们希望能够尽可能记录错误信息,为重跑提供先决条件。


在 Akka 中,构建了父子 Actor 的树形监督结构,提供 Actor 的监督机制以保证容错性,把处理响应错误的责任交给出错对象以外的实体。父 Actor 创建子 Actor 来委托处理子任务,同时便会自动地监管它们。子 Actor 列表维护在父 Actor 的上下文中,父 Actor 可以访问它。



Akka 中的 Actor 结构


通过更进一步的拆分细化,我们将 Worker 端的 Actor 分为 Prepare 和 Executor 两种,Prepare 为主要负责数据准备步骤,Executor 负责模型相关步骤,统一由 Worker 端的父 Actor 管理,错误和异常均向上层抛出,由 Worker 端的父 Actor 记录并发送给的错误收集模块统一处理。


实践应用

ActorSystem

创建 ActorSystem 时,默认将在 classpath 中寻找 application.conf、 application.json 和 application.properties,并自动加载:


val system=ActorSystem("RsModelActorSystem")val system=ActorSystem("RsModelActorSystem", ConfigFactory.load()) //同上
复制代码


如果想要使用自己的配置文件,可以通过 ConfigFactory 来配置加载:


    val system = ActorSystem("UniversityMessageSystem",            ConfigFactory.load("own-application.conf")) 

val config = ConfigFactory.parseString( s""" |akka.remote.netty.tcp.hostname = $host |akka.actor.provider = akka.remote.RemoteActorRefProvider |akka.remote.enabled-transport = akka.remote.netty.tcp |akka.remote.netty.tcp.port = 2445 """.stripMargin) val system = ActorSystem("RsModelActorSystem", config.withFallback(ConfigFactory.load())) //同上

复制代码


ActorSystem 的配置参数中有大量参数可以自定义,需要根据实际需要修改,例如在该项目中,后期单个算法任务对象大小超过了 Akka remote 默认包大小 128000 bytes,需要修改参数 akka.remote.netty.tcp.maximum-frame-size


Actor

一个 Actor 包含了状态、行为、一个邮箱、子 Actor 和一个监管策略,所有这些封装在一个 Actor 引用里。Actor 对象通常包含一些变量来反映其所处的可能状态,Akka-actor 自身的轻量线程与系统的其他部分完全隔离,因此无须担心并发问题。每当一个消息被处理,它会与 Actor 的当前行为进行匹配。行为是一个函数,它定义了在某个时间点处理当前消息所要采取的动作,需要结合实际需求编写具体逻辑。Actor 的邮箱是连接发送者与接收者的纽带,每个 Actor 有且仅有一个邮箱,所有的发来的消息都在邮箱里排队。可以有不同策略的邮箱实现供选择,缺省时为 FIFO。


编写逻辑


在 Actor 类中,主要逻辑均在 receive 方法中实现,通过偏函数方法,执行并返回对应的逻辑:


    //ActorLogging提供Actor内部的日志输出    class RsActor extends Actor with ActorLogging {        override def receive: Receive = {          case MapMessage(parameters) =>              println(parameters.get("code"))            case MapKeyMessage(parameters, key) =>              println(parameters.get(key))
case StringMessage(msg) => println(msg.getBytes().length)
case o: Object => println(o.getClass)
case _: AnyRef => println("233") } }

复制代码


生成引用

生成一个可以接收消息的 Actor 实例主要有两个方法:


    //生成一个基于本地类的Actor实例    val rsActor = system.actorOf(Props[RsActor], "rsActor")    //生成一个基于远程地址的Actor实例    val rmActor =       system.actorSelection("akka.tcp://RsModelAkkaSystem@192.168.1.9:2445/user/rsActor")        //  使用!向对应的Actor实例发送消息    rsActor ! StringMessage("test")    rmActor ! MapMessage(Map("code"->"233"))
复制代码


Message

Akka 中对传递的消息内容并没有太严格要求,可以是基本数据类型,也可以是支持序列化的对象:


    //scala的case class便于简洁地创建消息类    case class StringMessage(msg: String) extends Serializable    case class MapMessage(parameters: Map[String, String]) extends Serializable    case class MapKeyMessage(parameters: Map[String, String], key: String) extends Serializable
复制代码


其他

Akka 作为一款被广泛使用的开源工具,在实际项目中体现出了很多的优势,异步的消息驱动方式也给我们提供了一套新的思路和实现方法。


作者介绍:李天烨,TalkingData 数据科学家。毕业于东北大学,任职于 TalkingData 数据科学团队,从事数据科学自动化相关工作。


2019 年 1 月 29 日 15:0211392

评论 1 条评论

发布
用户头像
来自遥远南方的老马子 觉得你很优秀
2019 年 01 月 30 日 15:14
回复
没有更多了
发现更多内容

【通俗易懂】JWT-使用的可能正确姿势

Daniel

JWT 6月日更 6 月日更

NCRE考试感想 三级信息安全(上)

万里无云万里天

信息安全 6月日更 NCRE 考试经验

anyRTC Web SDK 实现音视频呼叫功能

anyRTC开发者

音视频 WebRTC RTC sdk

【LeetCode】连续数组Java题解

HQ数字卡

算法 LeetCode 6月日更

亮相Google I/O,字节跳动是这样应用Flutter的

字节跳动技术团队

和12岁小同志搞创客开发:如何驱动各类型传感器?

不脱发的程序猿

DIY 传感器 如何驱动各类型传感器? 创客

Qcon全球软件开发大会 融云分享SDK交付质量保障经验

融云 RongCloud

Overbit Flash|5 月加密货币市场风暴抹去了 90% 以上的 NFT 交易量

Overbit学院

比特币 加密货币 NFT Overbit 保证金交易

☕【JVM技术之旅】全流程化分析Java对象的创建过程

李浩宇/Alex

JVM 6月日更 6 月日更 对象布局 内存结构

【融云技术】超大规模并发下自定义属性的设置与分发

融云 RongCloud

Golang Testing 概览 - 基本篇

hedzr

golang Unit Test testing

超超超超级详细的多边形游戏问题分析(动态规划)

若尘

算法 动态规划 六月日更

如履薄冰--亚马逊直运系统重构实录

蔡超

软件架构 软件重构 软件自动化测试

6000年,看懂了「硬核山东」!

浪潮云

云计算

【Flutter 专题】113 图解自定义 ACEPieWidget 饼状图 (二)

阿策小和尚

Flutter 小菜 0 基础学习 Flutter Android 小菜鸟 6 月日更

GitHub火到糊!这份阿里内部10W字Java面试总结,让你薪资翻倍

Java架构追梦

Java 架构 阿里巴巴面试 跳槽 10W字面试题

详解浏览器跨域访问的几种办法

华为云开发者社区

安全 浏览器 跨域 WEB安全 跨域访问

C 语言数据结构的封装方法

实力程序员

裕民银行 x mPaaS | 移动应用“适老化”改造,可不止是字体变大

蚂蚁集团移动开发平台 mPaaS

mPaaS APP开发 移动开发平台

ETL工程师必看!超实用的任务优化与断点执行方案

会飞的鱼

大数据 ETL算法 ETL ETL任务 ETL系统

踩准时钟节拍、玩转时间转换,鸿蒙轻内核时间管理有妙招

华为云开发者社区

鸿蒙 时间管理 计数器 时间转换 计时

华云大咖说 | 安超OS全面升级 最新亮点解密

华云数据

🏆【声网Agora】「WebRTC-如何搭建语音认证服务」

李浩宇/Alex

WebRTC RTC征文大赛 Agora 6月日更 6 月日更

获5项大奖,发布《云计算开放应用架构标准》,阿里云持续领航云原生

阿里巴巴中间件

音视频学习--弱网对抗技术相关实践

Fenngton

网络 实时音视频 视频编解码 弱网下的极限实时视频通信 实时视频

云原生时代跨语言间微服务的打法

Damon

Java 微服务 云原生 SpringCloud 6月日更

为鸿蒙OS说两句公道话(我对鸿蒙OS的一些看法)

Phoenix

5W1H聊开源之What——开源是什么?

禅道项目管理

开源 软件 开发

有点难的知识点:Webpack Chunk 分包规则详解

范文杰

webpack 六月日更

分库分表 vs NewSQL 数据库

xcbeyond

分库分表 6月日更

C++多线程强制终止

华为云开发者社区

c++ 安全 线程 多线程 可信

Study Go: From Zero to Hero

Study Go: From Zero to Hero

复杂业务下,我们为何选择Akka作为异步通信框架?-InfoQ