11 月 19 - 20 日 Apache Pulsar 社区年度盛会来啦,立即报名! 了解详情
写点什么

Flink 流式计算在节省资源方面的简单分析

  • 2019-11-12
  • 本文字数:5951 字

    阅读完需:约 20 分钟

Flink 流式计算在节省资源方面的简单分析

Flink 在小米的发展简介


小米在流式计算方面经历了 Storm、Spark Streaming 和 Flink 的发展历程;从 2019 年 1 月接触 Flink 到现在,已经过去了大半年的时间了。对 Flink 的接触越深,越能感受到它在流式计算方面的强大能力;无论是实时性、时间语义还是对状态计算的支持等,都让很多之前需要复杂业务逻辑实现的功能转变成了简洁的 API 调用。还有不断完善的 Flink SQL 功能也让人充满期待,相信实时数据分析的门槛会越来越低,更多的业务能够挖掘出数据更实时更深入的价值。


在这期间,我们逐步完善了稳定性、作业管理、日志和监控收集展示等关系到用户易用性和运维能力的特性,帮助越来越多的业务接入到了 Flink。


流式作业管理服务的界面:



Flink 作业的监控指标收集展示:



Flink 作业异常日志的收集展示:



Spark Streaming 迁移到 Flink 的效果小结

在业务从 Spark Streaming 迁移到 Flink 的过程中,我们也一直在关注着一些指标的变化,比如数据处理的延迟、资源使用的变化、作业的稳定性等。其中有一些指标的变化是在预期之中的,比如数据处理延迟大大降低了,一些状态相关计算的“准确率”提升了;但是有一项指标的变化是超出我们预期的,那就是节省的资源。


信息流推荐业务是小米从 Spark Streaming 迁移到 Flink 流式计算最早也是使用 Flink 最深的业务之一,在经过一段时间的合作优化后,对方同学给我们提供了一些使用效果小结,其中有几个关键点:


  • 对于无状态作业,数据处理的延迟由之前 Spark Streaming 的 16129ms 降低到 Flink 的 926ms,有 94.2%的显著提升(有状态作业也有提升,但是和具体业务逻辑有关,不做介绍);

  • 对后端存储系统的写入延迟从 80ms 降低到了 20ms 左右,如下图(这是因为 Spark Streaming 的 mini batch 模式会在 batch 最后有批量写存储系统的操作,从而造成写请求尖峰,Flink 则没有类似问题):



  • 对于简单的从消息队列 Talos 到存储系统 HDFS 的数据清洗作业(ETL),由之前 Spark Streaming 的占用 210 个 CPU Core 降到了 Flink 的 32 个 CPU Core,资源利用率提高了 84.8%;


其中前两点优化效果是比较容易理解的,主要是第三点我们觉得有点超出预期。为了验证这一点,信息流推荐的同学帮助我们做了一些测试,尝试把之前的 Spark Streaming 作业由 210 个 CPU Core 降低到 64 个,但是测试结果是作业出现了数据拥堵。这个 Spark Streaming 测试作业的 batch interval 是 10s,大部分 batch 能够在 8s 左右运行完,偶尔抖动的话会有十几秒,但是当晚高峰流量上涨之后,这个 Spark Streaming 作业就会开始拥堵了,而 Flink 使用 32 个 CPU Core 却没有遇到拥堵问题。


很显然,更低的资源占用帮助业务更好的节省了成本,节省出来的计算资源则可以让更多其他的业务使用;为了让节省成本能够得到“理论”上的支撑,我们尝试从几个方面研究并对比了 Spark Streaming 和 Flink 的一些区别:


调度计算 VS 调度数据

对于任何一个分布式计算框架而言,如果“数据”和“计算”不在同一个节点上,那么它们中必须有一个需要移动到另一个所在的节点。如果把计算调度到数据所在的节点,那就是“调度计算”,反之则是“调度数据”;在这一点上 Spark Streaming 和 Flink 的实现是不同的。



Spark 的核心数据结构 RDD 包含了几个关键信息,包括数据的分片(partitions)、依赖(dependencies)等,其中还有一个用于优化执行的信息,就是分片的“preferred locations”


// RDD/** * Optionally overridden by subclasses to specify placement preferences. */protected def getPreferredLocations(split: Partition): Seq[String] = Nil
复制代码


这个信息提供了该分片数据的位置信息,即所在的节点;Spark 在调度该分片的计算的时候,会尽量把该分片的计算调度到数据所在的节点,从而提高计算效率。比如对于 KafkaRDD,该方法返回的就是 topic partition 的 leader 节点信息:


// KafkaRDDoverride def getPreferredLocations(thePart: Partition): Seq[String] = {    val part = thePart.asInstanceOf[KafkaRDDPartition]    Seq(part.host)     // host: preferred kafka host, i.e. the leader at the time the rdd was created  }
复制代码


”调度计算”的方法在批处理中有很大的优势,因为“计算”相比于“数据”来讲一般信息量比较小,如果“计算”可以在“数据”所在的节点执行的话,会省去大量网络传输,节省带宽的同时提高了计算效率。但是在流式计算中,以 Spark Streaming 的调度方法为例,由于需要频繁的调度”计算“,则会有一些效率上的损耗。


首先,每次”计算“的调度都是要消耗一些时间的,比如“计算”信息的序列化 → 传输 → 反序列化 → 初始化相关资源 → 计算执行→执行完的清理和结果上报等,这些都是一些“损耗”。


另外,用户的计算中一般会有一些资源的初始化逻辑,比如初始化外部系统的客户端(类似于 Kafka Producer 或 Consumer);每次计算的重复调度容易导致这些资源的重复初始化,需要用户对执行逻辑有一定的理解,才能合理地初始化资源,避免资源的重复创建;这就提高了使用门槛,容易埋下隐患;通过业务支持发现,在实际生产过程中,经常会遇到大并发的 Spark Streaming 作业给 Kafka 或 HBase 等存储系统带来巨大连接压力的情况,就是因为用户在计算逻辑中一直重复创建连接。


Spark 在官方文档提供了一些避免重复创建网络连接的示例代码,其核心思想就是通过连接池来复用连接:


rdd.foreachPartition { partitionOfRecords =>// ConnectionPool is a static, lazily initialized pool of connections    val connection = ConnectionPool.getConnection()    partitionOfRecords.foreach(record => connection.send(record))    ConnectionPool.returnConnection(connection)  // return to the pool for future reuse  }}
复制代码


需要指出的是,即使用户代码层面合理的使用了连接池,由于同一个“计算”逻辑不一定调度到同一个计算节点,还是可能会出现在不同计算节点上重新创建连接的情况。



Flink 和 Storm 类似,都是通过“调度数据”来完成计算的,也就是“计算逻辑”初始化并启动后,如果没有异常会一直执行,源源不断地消费上游的数据,处理后发送到下游;有点像工厂里的流水线,货物在传送带上一直传递,每个工人专注完成自己的处理逻辑即可。


虽然“调度数据”和“调度计算”有各自的优势,但是在流式计算的实际生产场景中,“调度计算”很可能“有力使不出来”;比如一般流式计算都是消费消息队列 Kafka 或 Talos 的数据进行处理,而实际生产环境中为了保证消息队列的低延迟和易维护,一般不会和计算节点(比如 Yarn 服务的节点)混布,而是有各自的机器(不过很多时候是在同一机房);所以无论是 Spark 还是 Flink,都无法避免消息队列数据的跨网络传输。所以从实际使用体验上讲,Flink 的调度数据模式,显然更容易减少损耗,提高计算效率,同时在使用上更符合用户“直觉”,不易出现重复创建资源的情况。


不过这里不得不提的一点是,Spark Streaming 的“调度计算”模式,对于处理计算系统中的“慢节点”或“异常节点”有天然的优势。比如如果 Yarn 集群中有一台节点磁盘存在异常,导致计算不停地失败,Spark 可以通过 blacklist 机制停止调度计算到该节点,从而保证整个作业的稳定性。或者有一台计算节点的 CPU Load 偏高,导致处理比较慢,Spark 也可以通过 speculation 机制及时把同一计算调度到其他节点,避免慢节点拖慢整个作业;而以上特性在 Flink 中都是缺失的。


Mini batch vs streaming


Spark Streaming 并不是真正意义上的流式计算,而是从批处理衍生出来的 mini batch 计算。如图所示,Spark 根据 RDD 依赖关系中的 shuffle dependency 进行作业的 Stage 划分,每个 Stage 根据 RDD 的 partition 信息切分成不同的分片;在实际执行的时候,只有当每个分片对应的计算结束之后,整个个 Stage 才算计算完成。



这种模式容易出现“长尾效应”,比如如果某个分片数据量偏大,那么其他分片也必须等这个分片计算完成后,才能进行下一轮的计算(Spark speculation 对这种情况也没有好的作用,因为这个是由于分片数据不均匀导致的),这样既增加了其他分片的数据处理延迟,也浪费了资源。


而 Flink 则是为真正的流式计算而设计的(并且把批处理抽象成有限流的数据计算),上游数据是持续发送到下游的,这样就避免了某个长尾分片导致其他分片计算“空闲”的情况,而是持续在处理数据,这在一定程度上提高了计算资源的利用率,降低了延迟。



当然,这里又要说一下 mini batch 的优点了,那就在异常恢复的时候,可以以比较低的代价把缺失的分片数据恢复过来,这个主要归功于 RDD 的依赖关系抽象;如上图所示,如果黑色块表示的数据丢失(比如节点异常),Spark 仅需要通过重放“Good-Replay”表示的数据分片就可以把丢失的数据恢复,这个恢复效率是很高的。



而 Flink 的话则需要停止整个“流水线”上的算子,并从 Checkpoint 恢复和重放数据;虽然 Flink 对这一点有一些优化,比如可以配置 failover strategy 为 region 来减少受影响的算子,不过相比于 Spark 只需要从上个 Stage 的数据恢复受影响的分片来讲,代价还是有点大。


总之,通过对比可以看出,Flink 的 streaming 模式对于低延迟处理数据比较友好,Spark 的 mini batch 模式则于异常恢复比较友好;如果在大部分情况下作业运行稳定的话,Flink 在资源利用率和数据处理效率上确实更占优势一些。


数据序列化


简单来说,数据的序列化是指把一个 object 转化为 byte stream,反序列化则相反。序列化主要用于对象的持久化或者网络传输。


常见的序列化格式有 binary、json、xml、yaml 等;常见的序列化框架有 Java 原生序列化、Kryo、Thrift、Protobuf、Avro 等。


对于分布式计算来讲,数据的传输效率非常重要。好的序列化框架可以通过较低 的序列化时间和较低的内存占用大大提高计算效率和作业稳定性。在数据序列化上,Flink 和 Spark 采用了不同的方式;Spark 对于所有数据默认采用 Java 原生序列化方式,用户也可以配置使用 Kryo;而 Flink 则是自己实现了一套高效率的序列化方法。


首先说一下 Java 原生的序列化方式,这种方式的好处是比较简单通用,只要对象实现了 Serializable 接口即可;缺点就是效率比较低,而且如果用户没有指定 serialVersionUID 的话,很容易出现作业重新编译后,之前的数据无法反序列化出来的情况(这也是 Spark Streaming Checkpoint 的一个痛点,在业务使用中经常出现修改了代码之后,无法从 Checkpoint 恢复的问题);当然 Java 原生序列化还有一些其他弊端,这里不做深入讨论。


有意思的是,Flink 官方文档里对于不要使用 Java 原生序列化强调了三遍,甚至网上有传言 Oracle 要抛弃 Java 原生序列化:




相比于 Java 原生序列化方式,无论是在序列化效率还是序列化结果的内存占用上,Kryo 则更好一些(Spark 声称一般 Kryo 会比 Java 原生节省 10x 内存占用);Spark 文档中表示它们之所以没有把 Kryo 设置为默认序列化框架的唯一原因是因为 Kryo 需要用户自己注册需要序列化的类,并且建议用户通过配置开启 Kryo。


虽然如此,根据 Flink 的测试,Kryo 依然比 Flink 自己实现的序列化方式效率要低一些;如图所示是 Flink 序列化器(PojoSerializer、RowSerializer、TupleSerializer)和 Kryo 等其他序列化框架的对比,可以看出 Flink 序列化器还是比较占优势的:



那么 Flink 到底是怎么做的呢?网上关于 Flink 序列化的文章已经很多了,这里我简单地说一下我的理解。


像 Kryo 这种序列化方式,在序列化数据的时候,除了数据中的“值”信息本身,还需要把一些数据的 meta 信息也写进去(比如对象的 Class 信息;如果是已经注册过的 Class,则写一个更节省内存的 ID)。



但是在 Flink 场景中则完全不需要这样,因为在一个 Flink 作业 DAG 中,上游和下游之间传输的数据类型是固定且已知的,所以在序列化的时候只需要按照一定的排列规则把“值”信息写入即可(当然还有一些其他信息,比如是否为 null)。



如图所示是一个内嵌 POJO 的 Tuple3 类型的序列化形式,可以看出这种序列化方式非常地“紧凑”,大大地节省了内存并提高了效率。另外,Flink 自己实现的序列化方式还有一些其他优势,比如直接操作二进制数据等。


凡事都有两面性,自己实现序列化方式也是有一些劣势,比如状态数据的格式兼容性(State Schema Evolution);如果你使用 Flink 自带的序列化框架序进行状态保存,那么修改状态数据的类信息后,可能在恢复状态时出现不兼容问题(目前 Flink 仅支持 POJO 和 Avro 的格式兼容升级)。


另外,用户为了保证数据能使用 Flink 自带的序列化器,有时候不得不自己再重写一个 POJO 类,把外部系统中数据的值再“映射”到这个 POJO 类中;而根据开发人员对 POJO 的理解不同,写出来的效果可能不一样,比如之前有个用户很肯定地说自己是按照 POJO 的规范来定义的类,我查看后发现原来他不小心多加了个 logger,这从侧面说明还是有一定的用户使用门槛的。


// Not a POJO demo.public class Person {  private Logger logger = LoggerFactory.getLogger(Person.class);  public String name;  public int age;}

复制代码


针对这一情况我们做了一些优化尝试,由于在小米内部很多业务是通过 Thrfit 定义的数据,正常情况下 Thrift 类是通过 Kryo 的默认序列化器进行序列化和反序列化的,效率比较低。虽然官方提供了优化文档,可以通过如下方式进行优化,但是对业务来讲也是存在一定使用门槛;


final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// register the serializer included with Apache Thrift as the standard serializer// TBaseSerializer states it should be initialized as a default Kryo serializerenv.getConfig().addDefaultKryoSerializer(MyCustomType.class, TBaseSerializer.class);
复制代码


于是我们通过修改 Flink 中 Kryo 序列化器的相关逻辑,实现了对 Thrfit 类默认使用 Thrift 自己序列化器的优化,在大大提高了数据序列化效率的同时,也降低了业务的使用门槛。


总之,通过自己定制序列化器的方式,确实让 Flink 在数据处理效率上更有优势,这样作业就可以通过占用更低的带宽和更少的计算资源完成计算了。


本文小结

Flink 和 Spark Streaming 有非常大的差别,也有各自的优势,这里我只是简单介绍了一下自己浅薄的理解,不是很深入。不过从实际应用效果来看,Flink 确实通过高效的数据处理和资源利用,实现了成本上的优化;希望能有更多业务可以了解并试用 Flink,后续我们也会通过 Flink SQL 为更多业务提供简单易用的流式计算支持,谢谢!


参考文章:


1、《Deep Dive on Apache Flink State》 - Seth Wiesman


2、Flink 原理与实现:内存管理


3、Batch Processing — Apache Spark


2019-11-12 10:342229

评论

发布
暂无评论
发现更多内容

疫情之后,幸获内推,4面京东拿下offer(Java后台研发岗)

公众号_愿天堂没有BUG

Java 编程 程序员 架构 面试

最全的MySQL总结,助你向阿里“开炮”(面试题+笔记+思维图)

公众号_愿天堂没有BUG

Java 编程 程序员 架构 面试

Java架构速成笔记:七大专题,1425页考点,挑战P8岗

Java~~~

Java spring 面试 微服务 架构师

13W字!腾讯高工手写“Netty速成手册”,3天能走向实战

Java~~~

Java 面试 微服务 Netty 架构师

我们是如何优雅修改正式环境的表结构,而不影响线上运行的?

做梦都在改BUG

05-高性能复杂度

Lane

从外包进入苏宁再跳槽阿里,分享这五年来我“走过的路”

Java 编程 程序员 面试 计算机

模块四作业

Mr.He

架构实战营

04-可扩展架构

Lane

其实,这就是「幸存者偏差」

非著名程序员

提升认知 认知提升 个人提升 8月日更

Python3 基础语法

Geek_aee0b4

一线架构师开发总结:剖析并发编程+JVM性能,深入Tomcat与MySQL

公众号_愿天堂没有BUG

Java 编程 程序员 架构 面试

写作 7 堂课——【2. 复利式写作】

LeifChen

写作技巧 8月日更 复利写作

打咩哟!Github热榜第七的SpringBoot笔记(阿里内测版)终于开源!

Java 编程 程序员 架构 面试

Github访问量破百万!原来是美团大牛的分布式架构实战笔记上线了

Java~~~

Java 面试 分布式 微服务 架构师

仅靠七个步骤,4面通过拿offer,终“跳进”字节跳动

公众号_愿天堂没有BUG

Java 编程 程序员 架构 面试

Minerva -- Airbnb的大规模数据指标系统 Part 2

俞凡

架构 Airbnb 大厂实践 指标

03- 面向复杂度的架构设计

Lane

腾讯技术官手撸笔记,全新演绎“Kafka部署实战”,已开源

Java~~~

Java MySQL 面试 MQ 架构师

阿里开发人员献礼“Java架构成长笔记”,深入内核,拒绝蒙圈

Java~~~

Java spring 面试 微服务 架构师

手撸二叉树之二叉树中第二小的节点

HelloWorld杰少

数据结构与算法 8月日更

新手小白花几个月勇敢裸辞转行网络安全

网络安全学海

网络安全 信息安全 转行 渗透测试 安全漏洞

记录一次基于Qt的内存数据修改工具开发

星河寒水

qt 内存数据修改 Cheat Engine

模块4.存储架构设计

脉动

【架构实战营】毕业设计

swordman

架构实战营

Minerva -- Airbnb的大规模数据指标系统 Part 1

俞凡

架构 Airbnb 大厂实践 指标

华为大佬的“百万级”MySQL笔记,基础+优化+架构一键搞定

Java~~~

Java MySQL 数据库 面试 架构师

二本渣渣5面阿里,从准备简历到“直怼”面试官,经历了什么?

公众号_愿天堂没有BUG

Android Jetpack Compose

Changing Lin

8月日更

初学字典-python

加哥

Flink 流式计算在节省资源方面的简单分析_大数据_王加胜_InfoQ精选文章