Flink 原理、实战与性能优化 (24):Flink 编程模型 3.4.2

阅读数:4 2019 年 12 月 11 日 20:42

Flink原理、实战与性能优化(24):Flink编程模型 3.4.2

(TypeInformation 信息获取)

内容简介
这是一部以实战为导向,能指导读者零基础掌握 Flink 并快速完成进阶的著作,从功能、原理、实战和调优等 4 个维度循序渐进地讲解了如何利用 Flink 进行分布式流式应用开发。作者是该领域的资深专家,现就职于第四范式,曾就职于明略数据。
全书一共 10 章,逻辑上可以分为三个部分:
第一部分(第 1~2 章)
主要介绍了 Flink 的核心概念、特性、应用场景、基本架构,开发环境的搭建和配置,以及源代码的编译。
第二部分(第 3~9 章)
详细讲解了 Flink 的编程范式,各种编程接口的功能、应用场景和使用方法,以及核心模块和组件的原理和使用。
第三部分(第 10 章)
重点讲解了 Flink 的监控和优化,参数调优,以及对反压、Checkpoint 和内存的优化。

通常情况下 Flink 都能正常进行数据类型推断,并选择合适的 serializers 以及 comparators。但在某些情况下却无法直接做到,例如定义函数时如果使用到了泛型,JVM 就会出现类型擦除的问题,使得 Flink 并不能很容易地获取到数据集中的数据类型信息。同时在 Scala API 和 Java API 中,Flink 分别使用了不同的方式重构了数据类型信息。

1. Scala API 类型信息

Scala API 通过使用 Manifest 和类标签,在编译器运行时获取类型信息,即使是在函数定义中使用了泛型,也不会像 Java API 出现类型擦除的问题,这使得 Scala API 具有非常精密的类型管理机制。同时在 Flink 中使用到 Scala Macros 框架,在编译代码的过程中推断函数输入参数和返回值的类型信息,同时在 Flink 中注册成 TypeInformation 以支持上层计算算子使用。

当使用 Scala API 开发 Flink 应用,如果使用到 Flink 已经通过 TypeInformation 定义的数据类型,TypeInformation 类不会自动创建,而是使用隐式参数的方式引入,代码不会直接抛出编码异常,但是当启动 Flink 应用程序时就会报”could not find implicit value for evidence parameter of type TypeInformation”的错误。这时需要将 TypeInformation 类隐式参数引入到当前程序环境中,代码实例如下:

复制代码
import org.apache.flink.api.scala._

2. Java API 类型信息

由于 Java 的泛型会出现类型擦除问题,Flink 通过 Java 反射机制尽可能重构类型信息,例如使用函数签名以及子类的信息等。同时类型推断在当输出类型依赖于输入参数类型时相对比较容易做到,但是如果函数的输出类型不依赖于输入参数的类型信息,这个时候就需要借助于类型提示(Ctype Himts)来告诉系统函数中传入的参数类型信息和输出参数信息。如代码清单 3-3 通过在 returns 方法中传入 TypeHint实例指定输出参数类型,帮助 Flink 系统对输出类型进行数据类型参数的推断和收集。

代码清单 3-3 定义 Type Hint 输出类型参数
复制代码
DataStream<Integer> typeStream = input
.flatMap(new MyMapFunction<String, Integer>())
.returns(new TypeHint<Integer>() {// 通过 returns 方法指定返回参数类型
});
// 定义泛型函数,输入参数类型为 <T,O>, 输出参数类型为 O
class MyMapFunction<T, O> implements MapFunction<T, O> {
public void flatMap(T value, Collector<O> out) {
// 定义计算逻辑
}
}

在使用 Java API 定义 POJOs 类型数据时,PojoTypeInformation 为 POJOs 类中的所有字段创建序列化器,对于标准的类型,例如 Integer、String、Long 等类型是通过 Flink 自带的序列化器进行数据序列化,对于其他类型数据都是直接调用 Kryo 序列化工具来进行序列化。

通常情况下,如果 Kryo 序列化工具无法对 POJOs 类序列化时,可以使用 Avro 对 POJOs 类进行序列化,如下代码通过在 ExecutionConfig 中调用 enableForceAvro() 来开启 Avro 序列化。

复制代码
ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
// 开启 Avro 序列化方式
env.getConfig().enableForceAvro();

如果用户想使用 Kryo 序列化工具来序列化 POJOs 所有字段,则在 ExecutionConfig 中调用 enableForceKryo() 来开启 Kryo 序列化。

复制代码
final ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().enableForceKryo();

如果默认的 Kryo 序列化类不能序列化 POJOs 对象,通过调用 ExecutionConfig 的 addDefault-KryoSerializer() 方法向 Kryo 中添加自定义的序列化器。

复制代码
env.getConfig().addDefaultKryoSerializer(Class<?> type, Class<? extends
Serializer<?>> serializerClass)

3. 自定义 TypeInformation

除了使用已有的 TypeInformation 所定义的数据格式类型之外,用户也可以自定义实现 TypeInformation,来满足的不同的数据类型定义需求。Flink 提供了可插拔的 Type Information Factory 让用户将自定义的 TypeInformation 注册到 Flink 类型系统中。如下代码所示只需要通过实现 org.apache.flink.api.common.typeinfo.TypeInfoFactory 接口,返回相应的类型信息。

  • 通过 @TypeInfo 注解创建数据类型,定义 CustomTuple 数据类型。
复制代码
@TypeInfo(CustomTypeInfoFactory.class)
public class CustomTuple<T0, T1> {
public T0 field0;
public T1 field1;
}
  • 然后定义 CustomTypeInfoFactory 类继承于 TypeInfoFactory,参数类型指定 CustomTuple。最后重写 createTypeInfo 方法,创建的 CustomTupleTypeInfo 就是 CustomTuple 数据类型 TypeInformation。
复制代码
public class CustomTypeInfoFactory extends TypeInfoFactory<CustomTuple> {
@Override
public TypeInformation<CustomTuple> createTypeInfo(Type t, Map<String, TypeInformation<?>> genericParameters) {
return new CustomTupleTypeInfo(genericParameters.get("T0"), genericParameters.get("T1"));
}
}

Flink原理、实战与性能优化(24):Flink编程模型 3.4.2

购书地址 https://item.jd.com/12518733.html?dist=jd

评论

发布