写点什么

通过 Java 来学习 Apache Beam

作者:Fabio Hiroki

  • 2022-06-28
  • 本文字数:5945 字

    阅读完需:约 20 分钟

通过Java来学习Apache Beam

在本文中,我们将介绍 Apache Beam,这是一个强大的批处理和流式处理开源项目,eBay 等大公司用它来集成流式处理管道,Mozilla 用它来在系统之间安全地移动数据。

概览

Apache Beam 是一种处理数据的编程模型,支持批处理和流式处理。


你可以使用它提供的 Java、Python 和 Go SDK 开发管道,然后选择运行管道的后端。

Apache Beam 的优势


Beam 的编程模型


  • 内置的 IO 连接器

  • Apache Beam 连接器可用于从几种类型的存储中轻松提取和加载数据。

  • 主要连接器类型有:

  • 基于文件的(例如 Apache Parquet、Apache Thrift);

  • 文件系统(例如 Hadoop、谷歌云存储、Amazon S3);

  • 消息传递(例如 Apache Kafka、Google Pub/Sub、Amazon SQS);

  • 数据库(例如 Apache Cassandra、Elastic Search、MongoDB)。

  • 作为一个 OSS 项目,对新连接器的支持在不断增长(例如 InfluxDB、Neo4J)。

  • 可移植性:

  • Beam 提供了几个运行管道的 Runner,你可以根据自己的场景选择最合适的,并避免供应商锁定。

  • 分布式处理后端,如 Apache Flink、Apache Spark 或 Google Cloud Dataflow 可以作为 Runner。

  • 分布式并行处理:

  • 默认情况下,数据集的每一项都是独立处理的,因此可以通过并行运行实现优化。

  • 开发人员不需要手动分配负载,因为 Beam 为它提供了一个抽象。

Beam 的编程模型

Beam 编程模型的关键概念:


  • PCollection:表示数据的集合,如从文本中提取的数字或单词数组。

  • PTransform:一个转换函数,接收并返回一个 PCollection,例如所有数字的和。

  • 管道:管理 PTransform 和 PCollection 之间的交互。

  • PipelineRunner:指定管道应该在哪里以及如何执行。

快速入门


一个基本的管道操作包括 3 个步骤:读取、处理和写入转换结果。这里的每一个步骤都是用 Beam 提供的 SDK 进行编程式定义的。


在本节中,我们将使用 Java SDK 创建管道。你可以创建一个本地应用程序(使用 Gradle 或 Maven 构建),也可以使用在线沙盒。示例将使用本地 Runner,因为这样使用 JUnit 断言验证结果会更容易些。

Java 本地依赖

  • beam-sdk-java-core:包含所有的 Beam 模型类。

  • beam-runners-direct-java:默认情况下 Beam SDK 将直接使用本地 Runner,也就是说管道将在本地机器上运行。

乘 2 操作

在第一个例子中,管道将接收到一个数字数组,并将每个元素乘以 2。


第一步是创建管道实例,它将接收输入数组并执行转换函数。因为我们使用 JUnit 运行 Beam,所以可以很容易地创建 TestPipeline 并将其作为测试类的一个字段。如果你更喜欢通过 main 方法来运行,需要设置管道配置参数


@Rulepublic final transient TestPipeline pipeline = TestPipeline.create();
复制代码


现在,我们可以创建作为管道输入的 PCollection。它是一个直接在内存中实例化的数组,但它也可以从支持 Beam 的任何地方读取。


PCollection<Integer> numbers =                pipeline.apply(Create.of(1, 2, 3, 4, 5));
复制代码


然后我们应用我们的转换函数,将每个元素乘以 2。


PCollection<Integer> output = numbers.apply(                MapElements.into(TypeDescriptors.integers())                      .via((Integer number) -> number * 2)      );
复制代码


为了验证结果,我们可以写一个断言。


PAssert.that(output)                .containsInAnyOrder(2, 4, 6, 8, 10);
复制代码


注意,结果不排序,因为 Beam 将每一个元素作为独立的项进行并行处理。


测试到这里就完成了,我们通过调用下面的方法运行管道:


pipeline.run();
复制代码

Reduce 操作

Reduce 操作将多个输入元素进行聚合,产生一个较小的集合,通常只包含一个元素。

MapReduce


现在我们来扩展上面的示例,将所有项乘以 2 后求和,产生一个 MapReduce 转换操作。


每一个 PCollection 转换都会产生一个新的 PCollection 实例,这意味着我们可以使用 apply 方法将转换链接起来。对于这个示例,将在每个元素乘以 2 后使用 Sum 操作:


PCollection<Integer> numbers =            pipeline.apply(Create.of(1, 2, 3, 4, 5));
PCollection<Integer> output = numbers .apply( MapElements.into(TypeDescriptors.integers()) .via((Integer number) -> number * 2)) .apply(Sum.integersGlobally());
PAssert.that(output) .containsInAnyOrder(30);
pipeline.run();
复制代码

FlatMap 操作

FlatMap 先对每个输入元素应用映射,返回一个新集合,从而产生一个集合的集合。然后再应用 Flat 操作将所有嵌套的集合合并,最终生成一个集合。


下一个示例将把字符串数组转换成包含唯一性单词的数组。


首先,我们声明将作为管道输入的单词列表:


final String[] WORDS_ARRAY = new String[] {          "hi bob", "hello alice", "hi sue"};
final List<String> WORDS = Arrays.asList(WORDS_ARRAY);
复制代码


然后,我们使用上面的列表创建输入 PCollection:


PCollection<String> input = pipeline.apply(Create.of(WORDS));
复制代码


现在,我们进行 FlatMap 转换,它将拆分每个嵌套数组中的单词,并将结果合并成一个列表:


PCollection<String> output = input.apply(      FlatMapElements.into(TypeDescriptors.strings())            .via((String line) -> Arrays.asList(line.split(" "))));
PAssert.that(output) .containsInAnyOrder("hi", "bob", "hello", "alice", "hi", "sue");
pipeline.run();
复制代码

Group 操作

数据处理的一个常见的任务是根据特定的键进行聚合或计数。我们将计算上一个例子中每个单词出现的次数。


在有了扁平的字符串数组之后,我们可以链接另一个 PTransform:


PCollection<KV<String, Long>> output = input            .apply(                  FlatMapElements.into(TypeDescriptors.strings())                      .via((String line) -> Arrays.asList(line.split(" ")))            )            .apply(Count.<String>perElement());
复制代码

产生结果:

PAssert.that(output).containsInAnyOrder(       KV.of("hi", 2L),       KV.of("hello", 1L),       KV.of("alice", 1L),       KV.of("sue", 1L),       KV.of("bob", 1L));
复制代码

从文件中读取

Beam 的一个原则是可以从任何地方读取数据,所以我们来看看在实际当中如何使用文本文件作为数据源。


下面的示例将读取包含“An advanced unified programming model”文本的文件“words.txt”。然后转换函数将返回一个包含每一个单词的 PCollection。


PCollection<String> input =      pipeline.apply(TextIO.read().from("./src/main/resources/words.txt"));
PCollection<String> output = input.apply( FlatMapElements.into(TypeDescriptors.strings()) .via((String line) -> Arrays.asList(line.split(" "))));
PAssert.that(output) .containsInAnyOrder("An", "advanced", "unified", "programming", "model");
pipeline.run();
复制代码

将结果写入文件

从前面的输入示例可以看到,Beam 提供了多个内置的输出连接器。在下面的例子中,我们将计算文本文件“words.txt”(只包含一个句子“An advanced unified programming model")中出现的每个单词的数量,输出结果将写入一个文本文件。


PCollection<String> input =      pipeline.apply(TextIO.read().from("./src/main/resources/words.txt"));
PCollection<KV<String, Long>> output = input .apply( FlatMapElements.into(TypeDescriptors.strings()) .via((String line) -> Arrays.asList(line.split(" "))) ) .apply(Count.<String>perElement());;
PAssert.that(output) .containsInAnyOrder( KV.of("An", 1L), KV.of("advanced", 1L), KV.of("unified", 1L), KV.of("programming", 1L), KV.of("model", 1L) );
output .apply( MapElements.into(TypeDescriptors.strings()) .via((KV<String, Long> kv) -> kv.getKey() + " " + kv.getValue())) .apply(TextIO.write().to("./src/main/resources/wordscount"));
pipeline.run();
复制代码


默认情况下,文件写入也针对并行性进行了优化,这意味着 Beam 将决定保存结果的最佳分片(文件)数量。这些文件位于 src/main/resources 文件夹中,文件名包含了前缀“wordcount”、碎片序号和碎片总数。


在我的笔记本电脑上运行它生成了 4 个分片:


第一个分片(文件名:wordscount-00001-of-00003):


An 1advanced 1
复制代码


第二个分片(文件名:wordscount-00002-of-00003):


unified 1model 1
复制代码


第三个分片(文件名:wordscount-00003-of-00003):


programming 1
复制代码


最后一个分片是空的,因为所有的单词都已经被处理完了。

扩展 Beam

我们可以通过编写自定义转换函数来扩展 Beam。自定义转换器将提高代码的可维护性,并消除重复工作。


基本上,我们需要创建一个 PTransform 的子类,将输入和输出的类型声明为 Java 泛型。然后重写 expand 方法,加入我们的逻辑,它将接受单个字符串并返回包含每个单词的 PCollection。


public class WordsFileParser extends PTransform<PCollection<String>, PCollection<String>> {
@Override public PCollection<String> expand(PCollection<String> input) { return input .apply(FlatMapElements.into(TypeDescriptors.strings()) .via((String line) -> Arrays.asList(line.split(" "))) ); } }
复制代码


用 WordsFileParser 来重构测试场景就变成了:


public class FileIOTest {
@Rule public final transient TestPipeline pipeline = TestPipeline.create();
@Test public void testReadInputFromFile() { PCollection<String> input = pipeline.apply(TextIO.read().from("./src/main/resources/words.txt"));
PCollection<String> output = input.apply( new WordsFileParser() );
PAssert.that(output) .containsInAnyOrder("An", "advanced", "unified", "programming", "model");
pipeline.run(); }
@Test public void testWriteOutputToFile() { PCollection<String> input = pipeline.apply(TextIO.read().from("./src/main/resources/words.txt"));
PCollection<KV<String, Long>> output = input .apply(new WordsFileParser()) .apply(Count.<String>perElement());
PAssert.that(output) .containsInAnyOrder( KV.of("An", 1L), KV.of("advanced", 1L), KV.of("unified", 1L), KV.of("programming", 1L), KV.of("model", 1L) );
output .apply( MapElements.into(TypeDescriptors.strings()) .via((KV<String, Long> kv) -> kv.getKey() + " " + kv.getValue())) .apply(TextIO.write().to ("./src/main/resources/wordscount"));
pipeline.run(); }}
复制代码


结果变成了更清晰和更模块化的管道。

时间窗口


Beam 的时间窗口


流式处理中一个常见的问题是将传入的数据按照一定的时间间隔进行分组,特别是在处理大量数据时。在这种情况下,分析每小时或每天的聚合数据比分析数据集的每个元素更有用。


在下面的例子中,我们将假设我们身处金融科技领域,我们正在接收包含金额和交易时间的事件,我们希望获取每天的交易总额。


Beam 提供了一种用时间戳来装饰每个 PCollection 元素的方法。我们可以通过这种方式创建一个代表 5 笔交易的 PCollection:


  • 金额 10 和 20 是在 2022 年 02 月 01 日转账的;

  • 金额 30、40 和 50 是在 2022 年 02 月 05 日转账的。


PCollection<Integer> transactions =      pipeline.apply(            Create.timestamped(                  TimestampedValue.of(10, Instant.parse("2022-02-01T00:00:00+00:00")),                  TimestampedValue.of(20, Instant.parse("2022-02-01T00:00:00+00:00")),                  TimestampedValue.of(30, Instant.parse("2022-02-05T00:00:00+00:00")),                  TimestampedValue.of(40, Instant.parse("2022-02-05T00:00:00+00:00")),                  TimestampedValue.of(50, Instant.parse("2022-02-05T00:00:00+00:00"))               )       );
复制代码


接下来,我们将应用两个转换函数:


  • 使用一天的时间窗口对交易进行分组;

  • 把每组的数量加起来。


PCollection<Integer> output =      Transactions             .apply(Window.into(FixedWindows.of(Duration.standardDays(1))))             .apply(Combine.globally(Sum.ofIntegers()).withoutDefaults());
复制代码


在第一个时间窗口(2022-02-01)中,预计总金额为 30(10+20),而在第二个窗口(2022-02-05)中,我们应该看到总金额为 120(30+40+50)。


PAssert.that(output)                   .inWindow(new IntervalWindow(                       Instant.parse("2022-02-01T00:00:00+00:00"),                       Instant.parse("2022-02-02T00:00:00+00:00")))                 .containsInAnyOrder(30);
PAssert.that(output) .inWindow(new IntervalWindow( Instant.parse("2022-02-05T00:00:00+00:00"), Instant.parse("2022-02-06T00:00:00+00:00"))) .containsInAnyOrder(120);
复制代码


每个 IntervalWindow 实例需要匹配所选时间段的确切开始和结束时间戳,因此所选时间必须是“00:00:00”。

总结

Beam 是一个强大的经过实战检验的数据框架,支持批处理和流式处理。我们使用 Java SDK 进行了 Map、Reduce、Group 和时间窗口等操作。


Beam 非常适合那些执行并行任务的开发人员,可以简化大规模数据处理的机制。


它的连接器、SDK 和对各种 Runner 的支持为我们带来了灵活性,你只要选择一个原生 Runner,如 Google Cloud Dataflow,就可以实现计算资源的自动化管理。


作者简介


Fabio Hiroki 是一位在 Mollie 公司从事金融服务的软件工程师。


原文链接


Introduction to Apache Beam Using Java

2022-06-28 09:015178

评论

发布
暂无评论
发现更多内容

玩游戏想记录一下自己超神的瞬间?那么就来看一下如何使用Unity截图吧

恬静的小魔龙

游戏开发 Unity 游戏引擎

任何时间,任何地点,超级侦探,认真办案!

龙智—DevSecOps解决方案

Jira Atlassian Jira Jira插件

擎创科技加入龙蜥社区,共建智能运维平台新生态

OpenAnolis小助手

开源 操作系统 龙蜥社区 CLA 擎创科技

Plus版SBOM:流水线物料清单PBOM

SEAL安全

开源 软件供应链 软件物料清单 SBOM 软件供应链安全

【高并发】如何实现亿级流量下的分布式限流?这些理论你必须掌握!!

冰河

并发编程 多线程 高并发 协程 异步编程

【IJCAI 2022】参数高效的大模型稀疏训练方法,大幅减少稀疏训练所需资源

阿里云大数据AI技术

深度学习 模型稀疏训练

游戏背包系统,“Inventory Pro插件”,研究学习-----妈妈再也不用担心我不会做背包了(Unity3D)

恬静的小魔龙

游戏开发 Unity 插件 游戏引擎

同事看了我的代码惊呼:居然是这么在Unity中用单例的

恬静的小魔龙

游戏开发 Unity 单例模式 游戏引擎

中文起,Python 字体反爬实战案例,再一点

梦想橡皮擦

Python 爬虫 7月月更

【策略模式】就像诸葛亮的锦囊

掘金安东尼

前端 设计模式 7月月更

只知道预制体是用来生成物体的?看我如何使用Unity生成UI预制体

恬静的小魔龙

游戏开发 Unity 游戏引擎

常见WEB攻击与防御

南城FE

前端 WEB安全 7月月更

JAVA编程规范之SQL 语句

源字节1号

前端开发 后端开发

浅谈低代码技术在物流管理中的应用与创新

王平

Java & Go 专场 | 阿里云中间件开发者线下 Meetup 开启报名

阿里巴巴云原生

Java Go 阿里云 云原生 中间件

让运动自然发生,FITURE打造全新生活方式

科技热闻

【龙智技术指南】Helix4Git简明使用手册

龙智—DevSecOps解决方案

Helix Core Helix4Git

基于Caffe ResNet-50网络实现图片分类(仅推理)的实验复现

华为云开发者联盟

人工智能 推理 昇腾 处理器

异步Servlet在转转图片服务的实践

转转技术团队

Servlet 异步

QCon 大会广州站它来了!独家定制双肩背包等你领取!

InfoQ写作社区官方

Qcon

创新突破!亚信科技助力中国移动某省完成核心账务数据库自主可控改造

亚信AntDB数据库

国产数据库

微软Azure和易观分析联合发布《企业级云原生平台驱动数字化转型》报告

易观分析

数字化转型

如何判断静态代码质量分析工具的性能?这五大因素必须考虑

龙智—DevSecOps解决方案

静态代码分析 代码静态分析 静态代码安全

活动报名 | 玩转 Kubernetes 容器服务提高班正式开营!

阿里巴巴云原生

阿里云 容器 云原生 训练营 课程

阿里云技术专家秦隆:可靠性保障必备——云上如何进行混沌工程

阿里云弹性计算

分布式系统 混沌工程 故障演练

银行理财子公司蓄力布局A股;现金管理类理财产品整改加速

易观分析

金融 银行

用Unity不会几个插件怎么能行?Unity各类插件及教程推荐

恬静的小魔龙

游戏开发 Unity 插件 游戏引擎

Linux常用命令

五分钟学大数据

Linux 7月月更

大话DevOps监控,团队如何选择监控工具?

龙智—DevSecOps解决方案

DevOps 监控 监控软件

2022 年中回顾|一文看懂预训练模型最新进展

澜舟孟子开源社区

人工智能 自然语言处理 算法 nlp 预训练模型

Java编程程序员怎么开发水平?

小谷哥

通过Java来学习Apache Beam_语言 & 开发_InfoQ精选文章