点击围观!腾讯 TAPD 助力金融行业研发提效、敏捷转型最佳实践! 了解详情
写点什么

Apache Flink 进阶(五):数据类型和序列化

  • 2019-10-15
  • 本文字数:5151 字

    阅读完需:约 17 分钟

Apache Flink 进阶(五):数据类型和序列化

本文根据 Apache Flink 系列直播整理而成,由 Apache Flink Contributor、360 数据开发高级工程师马庆祥老师分享。文章主要从如何为 Flink 量身定制的序列化框架、Flink 序列化的最佳实践、Flink 通信层的序列化以及问答环节四部分展开。如果你对于 Apache Flink 了解不多,可以先阅读Apache Flink 零基础入门系列文章。

为 Flink 量身定制的序列化框架

为什么定制?

为什么要为 Flink 量身定制序列化框架?


大家都知道现在大数据生态非常火,大多数技术组件都是运行在 JVM 上的,Flink 也是运行在 JVM 上,基于 JVM 的数据分析引擎都需要将大量的数据存储在内存中,这就不得不面临 JVM 的一些问题,比如 Java 对象存储密度较低等。针对这些问题,最常用的方法就是实现一个显式的内存管理,也就是说用自定义的内存池来进行内存的分配回收,接着将序列化后的对象存储到内存块中。


现在 Java 生态圈中已经有许多序列化框架,比如说 Java serialization, Kryo, Apache Avro 等等。但是 Flink 依然是选择了自己定制的序列化框架,那么到底有什么意义呢?若 Flink 选择自己定制的序列化框架,对类型信息了解越多,可以在早期完成类型检查,更好的选取序列化方式,进行数据布局,节省数据的存储空间,直接操作二进制数据。

Flink 的数据类型


Flink 在其内部构建了一套自己的类型系统,Flink 现阶段支持的类型分类如图所示,从图中可以看到 Flink 类型可以分为基础类型(Basic)、数组(Arrays)、复合类型(Composite)、辅助类型(Auxiliary)、泛型和其它类型(Generic)。Flink 支持任意的 Java 或是 Scala 类型。不需要像 Hadoop 一样去实现一个特定的接口(org.apache.hadoop.io.Writable),Flink 能够自动识别数据类型。



那这么多的数据类型,在 Flink 内部又是如何表示的呢?图示中的 Person 类,复合类型的一个 Pojo 在 Flink 中是用 PojoTypeInfo 来表示,它继承至 TypeInformation,也即在 Flink 中用 TypeInformation 作为类型描述符来表示每一种要表示的数据类型。

TypeInformation


TypeInformation 的思维导图如图所示,从图中可以看出,在 Flink 中每一个具体的类型都对应了一个具体的 TypeInformation 实现类,例如 BasicTypeInformation 中的 IntegerTypeInformation 和 FractionalTypeInformation 都具体的对应了一个 TypeInformation。然后还有 BasicArrayTypeInformation、CompositeType 以及一些其它类型,也都具体对应了一个 TypeInformation。


TypeInformation 是 Flink 类型系统的核心类。对于用户自定义的 Function 来说,Flink 需要一个类型信息来作为该函数的输入输出类型,即 TypeInfomation。该类型信息类作为一个工具来生成对应类型的序列化器 TypeSerializer,并用于执行语义检查,比如当一些字段在作为 joing 或 grouping 的键时,检查这些字段是否在该类型中存在。


如何使用 TypeInformation?下面的实践中会为大家介绍。

Flink 的序列化过程


在 Flink 序列化过程中,进行序列化操作必须要有序列化器,那么序列化器从何而来?每一个具体的数据类型都对应一个 TypeInformation 的具体实现,每一个 TypeInformation 都会为对应的具体数据类型提供一个专属的序列化器。通过 Flink 的序列化过程图可以看到 TypeInformation 会提供一个 createSerialize() 方法,通过这个方法就可以得到该类型进行数据序列化操作与反序化操作的对象 TypeSerializer。


对于大多数数据类型 Flink 可以自动生成对应的序列化器,能非常高效地对数据集进行序列化和反序列化,比如,BasicTypeInfo、WritableTypeIno 等,但针对 GenericTypeInfo 类型,Flink 会使用 Kyro 进行序列化和反序列化。其中,Tuple、Pojo 和 CaseClass 类型是复合类型,它们可能嵌套一个或者多个数据类型。在这种情况下,它们的序列化器同样是复合的。它们会将内嵌类型的序列化委托给对应类型的序列化器。


简单的介绍下 Pojo 的类型规则,即在满足一些条件的情况下,才会选用 Pojo 的序列化进行相应的序列化与反序列化的一个操作。即类必须是 Public 的,且类有一个 public 的无参数构造函数,该类(以及所有超类)中的所有非静态 no-static、非瞬态 no-transient 字段都是 public 的(和非最终的 final)或者具有公共 getter 和 setter 方法,该方法遵循 getter 和 setter 的 Java bean 命名约定。当用户定义的数据类型无法识别为 POJO 类型时,必须将其作为 GenericType 处理并使用 Kryo 进行序列化。


Flink 自带了很多 TypeSerializer 子类,大多数情况下各种自定义类型都是常用类型的排列组合,因而可以直接复用,如果内建的数据类型和序列化方式不能满足你的需求,Flink 的类型信息系统也支持用户拓展。若用户有一些特殊的需求,只需要实现 TypeInformation、TypeSerializer 和 TypeComparator 即可定制自己类型的序列化和比较大小方式,来提升数据类型在序列化和比较时的性能。



序列化就是将数据结构或者对象转换成一个二进制串的过程,在 Java 里面可以简单地理解成一个 byte 数组。而反序列化恰恰相反,就是将序列化过程中所生成的二进制串转换成数据结构或者对象的过程。下面就以内嵌型的 Tuple 3 这个对象为例,简述一下它的序列化过程。Tuple 3 包含三个层面,一是 int 类型,一是 double 类型,还有一个是 Person。Person 包含两个字段,一是 int 型的 ID,另一个是 String 类型的 name,它在序列化操作时,会委托相应具体序列化的序列化器进行相应的序列化操作。从图中可以看到 Tuple 3 会把 int 类型通过 IntSerializer 进行序列化操作,此时 int 只需要占用四个字节就可以了。根据 int 占用四个字节,这个能够体现出 Flink 可序列化过程中的一个优势,即在知道数据类型的前提下,可以更好的进行相应的序列化与反序列化操作。相反,如果采用 Java 的序列化,虽然能够存储更多的属性信息,但一次占据的存储空间会受到一定的损耗。


Person 类会被当成一个 Pojo 对象来进行处理,PojoSerializer 序列化器会把一些属性信息使用一个字节存储起来。同样,其字段则采取相对应的序列化器进行相应序列化,在序列化完的结果中,可以看到所有的数据都是由 MemorySegment 去支持。MemorySegment 具有什么作用呢?


MemorySegment 在 Flink 中会将对象序列化到预分配的内存块上,它代表 1 个固定长度的内存,默认大小为 32 kb。MemorySegment 代表 Flink 中的一个最小的内存分配单元,相当于是 Java 的一个 byte 数组。 每条记录都会以序列化的形式存储在一个或多个 MemorySegment 中。

Flink 序列化的最佳实践

最常见的场景

Flink 常见的应用场景有四种,即注册子类型、注册自定义序列化器、添加类型提示、手动创建 TypeInformation,具体介绍如下:


  • 注册子类型:如果函数签名只描述了超类型,但是它们实际上在执行期间使用了超类型的子类型,那么让 Flink 了解这些子类型会大大提高性能。可以在 StreamExecutionEnvironment 或 ExecutionEnvironment 中调用 .registertype (clazz) 注册子类型信息。

  • 注册自定义序列化:对于不适用于自己的序列化框架的数据类型,Flink 会使用 Kryo 来进行序列化,并不是所有的类型都与 Kryo 无缝连接,具体注册方法在下文介绍。

  • 添加类型提示:有时,当 Flink 用尽各种手段都无法推测出泛型信息时,用户需要传入一个类型提示 TypeHint,这个通常只在 Java API 中需要。

  • 手动创建一个 TypeInformation:在某些 API 调用中,这可能是必需的,因为 Java 的泛型类型擦除导致 Flink 无法推断数据类型。


其实在大多数情况下,用户不必担心序列化框架和注册类型,因为 Flink 已经提供了大量的序列化操作,不需要去定义自己的一些序列化器,但是在一些特殊场景下,需要去做一些相应的处理。

实践–类型声明

类型声明去创建一个类型信息的对象是通过哪种方式?通常是用 TypeInformation.of() 方法来创建一个类型信息的对象,具体说明如下:


  • 对于非泛型类,直接传入 class 对象即可。


 PojoTypeInfo<Person> typeInfo = (PojoTypeInfo<Person>) TypeInformation.of(Person.class);
复制代码


  • 对于泛型类,需要通过 TypeHint 来保存泛型类型信息。


 final TypeInfomation<Tuple2<Integer,Integer>> resultType = TypeInformation.of(new TypeHint<Tuple2<Integer,Integer>>(){});
复制代码


  • 预定义常量。


如 BasicTypeInfo,这个类定义了一系列常用类型的快捷方式,对于 String、Boolean、Byte、Short、Integer、Long、Float、Double、Char 等基本类型的类型声明,可以直接使用。而且 Flink 还提供了完全等价的 Types 类(org.apache.flink.api.common.typeinfo.Types)。特别需要注意的是,flink-table 模块也有一个 Types 类(org.apache.flink.table.api.Types),用于 table 模块内部的类型定义信息,用法稍有不同。使用 IDE 的自动 import 时一定要小心。


  • 自定义 TypeInfo 和 TypeInfoFactory。



通过自定义 TypeInfo 为任意类提供 Flink 原生内存管理(而非 Kryo),可令存储更紧凑,运行时也更高效。需要注意在自定义类上使用 @TypeInfo 注解,随后创建相应的 TypeInfoFactory 并覆盖 createTypeInfo() 方法。

实践–注册子类型

Flink 认识父类,但不一定认识子类的一些独特特性,因此需要单独注册子类型。


StreamExecutionEnvironment 和 ExecutionEnvironment 提供 registerType() 方法用来向 Flink 注册子类信息。


final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();Env. registerType(typeClass);
复制代码



在 registerType() 方法内部,会使用 TypeExtractor 来提取类型信息,如上图所示,获取到的类型信息属于 PojoTypeInfo 及其子类,那么需要将其注册到一起,否则统一交给 Kryo 去处理,Flink 并不过问(这种情况下性能会变差)。

实践–Kryo 序列化

对于 Flink 无法序列化的类型(例如用户自定义类型,没有 registerType,也没有自定义 TypeInfo 和 TypeInfoFactory),默认会交给 Kryo 处理,如果 Kryo 仍然无法处理(例如 Guava、Thrift、Protobuf 等第三方库的一些类),有两种解决方案:


  • 强制使用 Avro 来代替 Kryo。


 env.getConfig().enableForceAvro(); 
复制代码


  • 为 Kryo 增加自定义的 Serializer 以增强 Kryo 的功能。


 env.getConfig().addDefaultKryoSerializer(clazz, serializer);
复制代码


注:如果希望完全禁用 Kryo(100% 使用 Flink 的序列化机制),可以通过 Kryo-env.getConfig().disableGenericTypes() 的方式完成,但注意一切无法处理的类都将导致异常,这种对于调试非常有效。

Flink 通信层的序列化

Flink 的 Task 之间如果需要跨网络传输数据记录, 那么就需要将数据序列化之后写入 NetworkBufferPool,然后下层的 Task 读出之后再进行反序列化操作,最后进行逻辑处理。


为了使得记录以及事件能够被写入 Buffer,随后在消费时再从 Buffer 中读出,Flink 提供了数据记录序列化器(RecordSerializer)与反序列化器(RecordDeserializer)以及事件序列化器(EventSerializer)。


Function 发送的数据被封装成 SerializationDelegate,它将任意元素公开为 IOReadableWritable 以进行序列化,通过 setInstance() 来传入要序列化的数据。


在 Flink 通信层的序列化中,有几个问题值得关注,具体如下:


  • 何时确定 Function 的输入输出类型?



在构建 StreamTransformation 的时候通过 TypeExtractor 工具确定 Function 的输入输出类型。TypeExtractor 类可以根据方法签名、子类信息等蛛丝马迹自动提取或恢复类型信息。


  • 何时确定 Function 的序列化/反序列化器?


构造 StreamGraph 时,通过 TypeInfomation 的 createSerializer() 方法获取对应类型的序列化器 TypeSerializer,并在 addOperator() 的过程中执行 setSerializers() 操作,设置 StreamConfig 的 TYPE_SERIALIZER_IN_1 、 TYPE_SERIALIZER_IN_2、 TYPE_SERIALIZER_OUT_1 属性。


  • 何时进行真正的序列化/反序列化操作?这个过程与 TypeSerializer 又是怎么联系在一起的呢?



大家都应该清楚 Tsk 和 StreamTask 两个概念,Task 是直接受 TaskManager 管理和调度的,而 Task 又会调用 StreamTask,而 StreamTask 中真正封装了算子的处理逻辑。在 run() 方法中,首先将反序列化后的数据封装成 StreamRecord 交给算子处理;然后将处理结果通过 Collector 发动给下游(在构建 Collector 时已经确定了 SerializtionDelegate),并通过 RecordWriter 写入器将序列化后的结果写入 DataOutput;最后序列化的操作交给 SerializerDelegate 处理,实际还是通过 TypeSerializer 的 serialize() 方法完成。


这里回顾视频精彩内容。


相关文章:


Apache Flink进阶(四):Flink on Yarn/K8s原理剖析及实践


Apache Flink 进阶(三):Checkpoint 原理剖析与应用实践


Apache Flink进阶(二):时间属性深度解析


Apache Flink进阶(一):Runtime 核心机制剖析


Apache Flink 零基础入门系列文章


2019-10-15 08:075586

评论

发布
暂无评论
发现更多内容

开源星「001 号」落地 FlyFish,欢迎登陆赢神秘大礼包!

云智慧AIOps社区

大前端 低代码 开源项目 数据可视化 大屏可视化

视频聊天源码——一对一直播如何提高直播质量?

开源直播系统源码

软件开发 直播系统源码 开源源码 语音聊天 视频聊天源码

【活动报名】8月13日杭州站-开源遇上大数据

亚马逊云科技 (Amazon Web Services)

大数据 开源

用Python自动生成 图文并茂的数据分析 报告

兆锋

Python pip Office 自动化办公

crm系统哪家好?好用的crm管理系统推荐

优秀

CRM系统

Substrate 源码更新导读八月第1周: 新版事务化存储层启用默认模式, Polkadot v0.9.27发布

彭亚伦

Substrate polkadot 波卡

架构实战营第九模块作业-毕业项目

Geek_53787a

新思科技推动产业革新 为智能网联车系好“安全带”

InfoQ_434670063458

软件 车联网 新思科技

NFT+IDO预售代币合约模式系统开发

l8l259l3365

2022年中国第三方证券APP创新专题分析

易观分析

App 金融 证券

[ Kitex 源码解读 ] Kitex 请求重试的分类及实现原理

baiyutang

Go 微服务架构 云原生 kitex CloudWeGo

43%非常看好TypeScript…解读“2022前端开发者现状报告”

华为云开发者联盟

typescript 开发者 前端

【燃】是时候展现真正的实力了!一文看懂2022华为开发者大赛技术亮点

华为云开发者联盟

云计算 华为云 开发者大赛

【Python】:如何利用Python实现文件操作

翼同学

Python 编程语言 文件操作 8月日更 入门学习

Python语言基本语法元素

北极的三哈

Python 学习 开发语言

融云 x N 世界:构建无限用户实时交互的「元宇宙会场」

融云 RongCloud

isc 元宇宙

如何通过 open-local 玩转容器本地存储? | 龙蜥技术

OpenAnolis小助手

开源 云原生 分布式存储 龙蜥技术 open-local

uniapp电影购票选座系统源码

清风

源码 计算机毕业设计

【Python】:如何处理异常报错?

翼同学

Python 编程语言 异常处理 8月月更 入门学习

华为云全流程护航《流浪方舟》破竹首发,打造口碑爆款

华为云开发者联盟

云计算 后端 华为云 流浪方舟

SpringBatch入门

五毛

sping ETL

现在,怎么挑选舞台租赁LED显示屏?

Dylan

LED LED显示屏

开源一夏 | 使用 HTML、CSS 和 JS 制作一个中国象棋

海拥(haiyong.site)

开源 前端 8月月更

Apache Flink 进阶(五):数据类型和序列化_AICon_马庆祥_InfoQ精选文章