Flink 原理、实战与性能优化 (23):Flink 编程模型 3.4&3.4.1

阅读数:8 2019 年 12 月 11 日 20:42

Flink原理、实战与性能优化(23):Flink编程模型 3.4&3.4.1

(Flink 数据类型)

内容简介
这是一部以实战为导向,能指导读者零基础掌握 Flink 并快速完成进阶的著作,从功能、原理、实战和调优等 4 个维度循序渐进地讲解了如何利用 Flink 进行分布式流式应用开发。作者是该领域的资深专家,现就职于第四范式,曾就职于明略数据。
全书一共 10 章,逻辑上可以分为三个部分:
第一部分(第 1~2 章)
主要介绍了 Flink 的核心概念、特性、应用场景、基本架构,开发环境的搭建和配置,以及源代码的编译。
第二部分(第 3~9 章)
详细讲解了 Flink 的编程范式,各种编程接口的功能、应用场景和使用方法,以及核心模块和组件的原理和使用。
第三部分(第 10 章)
重点讲解了 Flink 的监控和优化,参数调优,以及对反压、Checkpoint 和内存的优化。


(数据类型支持)

Flink 支持非常完善的数据类型,数据类型的描述信息都是由 TypeInformation 定义,比较常用的 TypeInformation 有 BasicTypeInfo、TupleTypeInfo、CaseClassTypeInfo、PojoTypeInfo 类等。TypeInformation 主要作用是为了在 Flink 系统内有效地对数据结构类型进行管理,能够在分布式计算过程中对数据的类型进行管理和推断。同时基于对数据的类型信息管理,Flink 内部对数据存储也进行了相应的性能优化。Flink 能够支持任意的 Java 或 Scala 的数据类型,不用像 Hadoop 中的 org.apache.hadoop.io.Writable 而实现特定的序列化和反序列化接口,从而让用户能够更加容易使用已有的数据结构类型。另外使用 TypeInformation 管理数据类型信息,能够在数据处理之前将数据类型推断出来,而不是真正在触发计算后才识别出,这样能够及时有效地避免用户在使用 Flink 编写应用的过程中的数据类型问题。

1. 原生数据类型

Flink 通过实现 BasicTypeInfo 数据类型,能够支持任意 Java 原生基本类型(装箱)或 String 类型,例如 Integer、String、Double 等,如以下代码所示,通过从给定的元素集中创建 DataStream 数据集。

复制代码
// 创建 Int 类型的数据集
val intStream:DataStream[Int] = env.fromElements(3, 1, 2, 1, 5)
// 创建 String 类型的数据集
val dataStream: DataStream[String] = env.fromElements("hello", "flink")

Flink 实现另外一种 TypeInfomation 是 BasicArrayTypeInfo,对应的是 Java 基本类型数组(装箱)或 String 对象的数组,如下代码通过使用 Array 数组和 List 集合创建 DataStream 数据集。

复制代码
// 通过从数组中创建数据集
val dataStream: DataStream[Int] = env.fromCollection(Array(3, 1, 2, 1, 5))
// 通过 List 集合创建数据集
val dataStream: DataStream[Int] = env.fromCollection(List(3, 1, 2, 1, 5))

2. Java Tuples 类型

通过定义 TupleTypeInfo 来描述 Tuple 类型数据,Flink 在 Java 接口中定义了元祖类(Tuple)供用户使用。Flink Tuples 是固定长度固定类型的 Java Tuple 实现,不支持空值存储。目前支持任意的 Flink Java Tuple 类型字段数量上限为 25,如果字段数量超过上限,可以通过继承 Tuple 类的方式进行拓展。如下代码所示,创建 Tuple 数据类型数据集。

复制代码
// 通过实例化 Tuple2 创建具有两个元素的数据集
val tupleStream2: DataStream[Tuple2[String, Int]] = env.fromElements(new Tuple2("a",1), new Tuple2("c", 2))

3. Scala Case Class 类型

Flink 通过实现 CaseClassTypeInfo 支持任意的 Scala Case Class,包括 Scala tuples 类型,支持的字段数量上限为 22,支持通过字段名称和位置索引获取指标,不支持存储空值。如下代码实例所示,定义 WordCount Case Class 数据类型,然后通过 fromElements 方法创建 input 数据集,调用 keyBy() 方法对数据集根据 word 字段重新分区。

复制代码
// 定义 WordCount Case Class 数据结构
case class WordCount(word: String, count: Int)
// 通过 fromElements 方法创建数据集
val input = env.fromElements(WordCount("hello", 1), WordCount("world", 2))
val keyStream1 = input.keyBy("word") // 根据 word 字段为分区字段,
val keyStream2 = input.keyBy(0) // 也可以通过指定 position 分区

通过使用 Scala Tuple 创建 DataStream 数据集,其他的使用方式和 Case Class 相似。需要注意的是,如果根据名称获取字段,可以使用 Tuple 中的默认字段名称。

复制代码
// 通过 scala Tuple 创建具有两个元素的数据集
val tupleStream: DataStream[Tuple2[String, Int]] = env.fromElements(("a", 1),
("c", 2))
// 使用默认字段名称获取字段,其中 _1 表示 tuple 这种第一个字段
tupleStream.keyBy("_1")

4. POJOs 类型

POJOs 类可以完成复杂数据结构的定义,Flink 通过实现 PojoTypeInfo 来描述任意的 POJOs,包括 Java 和 Scala 类。在 Flink 中使用 POJOs 类可以通过字段名称获取字段,例如 dataStream.join(otherStream).where(“name”).equalTo(“personName”),对于用户做数据处理则非常透明和简单,如代码清单 3-2 所示。如果在 Flink 中使用 POJOs 数据类型,需要遵循以下要求:

  • POJOs 类必须是 Public 修饰且必须独立定义,不能是内部类;
  • POJOs 类中必须含有默认空构造器;
  • POJOs 类中所有的 Fields 必须是 Public 或者具有 Public 修饰的 getter 和 setter 方法;
  • POJOs 类中的字段类型必须是 Flink 支持的。
代码清单 3-2 Java POJOs 数据类型定义
复制代码
// 定义 Java Person 类,具有 public 修饰符
public class Person {
// 字段具有 public 修饰符
public String name;
public int age;
// 具有默认空构造器
public Person() {
}
public Person(String name, int age) {
this.name = name;
this.age = age;
}
}

定义好 POJOs Class 后,就可以在 Flink 环境中使用了,如下代码所示,使用 fromElements 接口构建 Person 类的数据集。POJOs 类仅支持字段名称指定字段,如代码中通过 Person name 来指定 Keyby 字段。

复制代码
val persionStream = env.fromElements(new Person("Peter",14),new
Person("Linda",25))
// 通过 Person.name 来指定 Keyby 字段
persionStream.keyBy("name")

Scala POJOs 数据结构定义如下,使用方式与 Java POJOs 相同。

复制代码
class Person(var name: String, var age: Int) {
// 默认空构造器
def this() {
this(null, -1)
}
}

5. Flink Value 类型

Value 数据类型实现了 org.apache.flink.types.Value,其中包括 read() 和 write() 两个方法完成序列化和反序列化操作,相对于通用的序列化工具会有着比较高效的性能。目前 Flink 提供了內建的 Value 类型有 IntValue、DoubleValue 以及 StringValue 等,用户可以结合原生数据类型和 Value 类型使用。

6. 特殊数据类型

在 Flink 中也支持一些比较特殊的数据数据类型,例如 Scala 中的 List、Map、Either、Option、Try 数据类型,以及 Java 中 Either 数据类型,还有 Hadoop 的 Writable 数据类型。如下代码所示,创建 Map 和 List 类型数据集。这种数据类型使用场景不是特别广泛,主要原因是数据中的操作相对不像 POJOs 类那样方便和透明,用户无法根据字段位置或者名称获取字段信息,同时要借助 Types Hint 帮助 Flink 推断数据类型信息,关于 Tyeps Hmt 介绍可以参考下一小节。

复制代码
// 创建 Map 类型数据集
val mapStream =
env.fromElements(Map("name"->"Peter","age"->18),Map("name"->"Linda",
"age"->25))
// 创建 List 类型数据集
val listStream = env.fromElements(List(1,2,3,5),List(2,4,3,2))

Flink原理、实战与性能优化(23):Flink编程模型 3.4&3.4.1

购书地址 https://item.jd.com/12518733.html?dist=jd

评论

发布