NVIDIA 初创加速计划,免费加速您的创业启动 了解详情
写点什么

通过 Java 来学习 Apache Beam

作者:Fabio Hiroki

  • 2022-06-28
  • 本文字数:5945 字

    阅读完需:约 20 分钟

通过Java来学习Apache Beam

在本文中,我们将介绍 Apache Beam,这是一个强大的批处理和流式处理开源项目,eBay 等大公司用它来集成流式处理管道,Mozilla 用它来在系统之间安全地移动数据。

概览

Apache Beam 是一种处理数据的编程模型,支持批处理和流式处理。


你可以使用它提供的 Java、Python 和 Go SDK 开发管道,然后选择运行管道的后端。

Apache Beam 的优势


Beam 的编程模型


  • 内置的 IO 连接器

  • Apache Beam 连接器可用于从几种类型的存储中轻松提取和加载数据。

  • 主要连接器类型有:

  • 基于文件的(例如 Apache Parquet、Apache Thrift);

  • 文件系统(例如 Hadoop、谷歌云存储、Amazon S3);

  • 消息传递(例如 Apache Kafka、Google Pub/Sub、Amazon SQS);

  • 数据库(例如 Apache Cassandra、Elastic Search、MongoDB)。

  • 作为一个 OSS 项目,对新连接器的支持在不断增长(例如 InfluxDB、Neo4J)。

  • 可移植性:

  • Beam 提供了几个运行管道的 Runner,你可以根据自己的场景选择最合适的,并避免供应商锁定。

  • 分布式处理后端,如 Apache Flink、Apache Spark 或 Google Cloud Dataflow 可以作为 Runner。

  • 分布式并行处理:

  • 默认情况下,数据集的每一项都是独立处理的,因此可以通过并行运行实现优化。

  • 开发人员不需要手动分配负载,因为 Beam 为它提供了一个抽象。

Beam 的编程模型

Beam 编程模型的关键概念:


  • PCollection:表示数据的集合,如从文本中提取的数字或单词数组。

  • PTransform:一个转换函数,接收并返回一个 PCollection,例如所有数字的和。

  • 管道:管理 PTransform 和 PCollection 之间的交互。

  • PipelineRunner:指定管道应该在哪里以及如何执行。

快速入门


一个基本的管道操作包括 3 个步骤:读取、处理和写入转换结果。这里的每一个步骤都是用 Beam 提供的 SDK 进行编程式定义的。


在本节中,我们将使用 Java SDK 创建管道。你可以创建一个本地应用程序(使用 Gradle 或 Maven 构建),也可以使用在线沙盒。示例将使用本地 Runner,因为这样使用 JUnit 断言验证结果会更容易些。

Java 本地依赖

  • beam-sdk-java-core:包含所有的 Beam 模型类。

  • beam-runners-direct-java:默认情况下 Beam SDK 将直接使用本地 Runner,也就是说管道将在本地机器上运行。

乘 2 操作

在第一个例子中,管道将接收到一个数字数组,并将每个元素乘以 2。


第一步是创建管道实例,它将接收输入数组并执行转换函数。因为我们使用 JUnit 运行 Beam,所以可以很容易地创建 TestPipeline 并将其作为测试类的一个字段。如果你更喜欢通过 main 方法来运行,需要设置管道配置参数


@Rulepublic final transient TestPipeline pipeline = TestPipeline.create();
复制代码


现在,我们可以创建作为管道输入的 PCollection。它是一个直接在内存中实例化的数组,但它也可以从支持 Beam 的任何地方读取。


PCollection<Integer> numbers =                pipeline.apply(Create.of(1, 2, 3, 4, 5));
复制代码


然后我们应用我们的转换函数,将每个元素乘以 2。


PCollection<Integer> output = numbers.apply(                MapElements.into(TypeDescriptors.integers())                      .via((Integer number) -> number * 2)      );
复制代码


为了验证结果,我们可以写一个断言。


PAssert.that(output)                .containsInAnyOrder(2, 4, 6, 8, 10);
复制代码


注意,结果不排序,因为 Beam 将每一个元素作为独立的项进行并行处理。


测试到这里就完成了,我们通过调用下面的方法运行管道:


pipeline.run();
复制代码

Reduce 操作

Reduce 操作将多个输入元素进行聚合,产生一个较小的集合,通常只包含一个元素。

MapReduce


现在我们来扩展上面的示例,将所有项乘以 2 后求和,产生一个 MapReduce 转换操作。


每一个 PCollection 转换都会产生一个新的 PCollection 实例,这意味着我们可以使用 apply 方法将转换链接起来。对于这个示例,将在每个元素乘以 2 后使用 Sum 操作:


PCollection<Integer> numbers =            pipeline.apply(Create.of(1, 2, 3, 4, 5));
PCollection<Integer> output = numbers .apply( MapElements.into(TypeDescriptors.integers()) .via((Integer number) -> number * 2)) .apply(Sum.integersGlobally());
PAssert.that(output) .containsInAnyOrder(30);
pipeline.run();
复制代码

FlatMap 操作

FlatMap 先对每个输入元素应用映射,返回一个新集合,从而产生一个集合的集合。然后再应用 Flat 操作将所有嵌套的集合合并,最终生成一个集合。


下一个示例将把字符串数组转换成包含唯一性单词的数组。


首先,我们声明将作为管道输入的单词列表:


final String[] WORDS_ARRAY = new String[] {          "hi bob", "hello alice", "hi sue"};
final List<String> WORDS = Arrays.asList(WORDS_ARRAY);
复制代码


然后,我们使用上面的列表创建输入 PCollection:


PCollection<String> input = pipeline.apply(Create.of(WORDS));
复制代码


现在,我们进行 FlatMap 转换,它将拆分每个嵌套数组中的单词,并将结果合并成一个列表:


PCollection<String> output = input.apply(      FlatMapElements.into(TypeDescriptors.strings())            .via((String line) -> Arrays.asList(line.split(" "))));
PAssert.that(output) .containsInAnyOrder("hi", "bob", "hello", "alice", "hi", "sue");
pipeline.run();
复制代码

Group 操作

数据处理的一个常见的任务是根据特定的键进行聚合或计数。我们将计算上一个例子中每个单词出现的次数。


在有了扁平的字符串数组之后,我们可以链接另一个 PTransform:


PCollection<KV<String, Long>> output = input            .apply(                  FlatMapElements.into(TypeDescriptors.strings())                      .via((String line) -> Arrays.asList(line.split(" ")))            )            .apply(Count.<String>perElement());
复制代码

产生结果:

PAssert.that(output).containsInAnyOrder(       KV.of("hi", 2L),       KV.of("hello", 1L),       KV.of("alice", 1L),       KV.of("sue", 1L),       KV.of("bob", 1L));
复制代码

从文件中读取

Beam 的一个原则是可以从任何地方读取数据,所以我们来看看在实际当中如何使用文本文件作为数据源。


下面的示例将读取包含“An advanced unified programming model”文本的文件“words.txt”。然后转换函数将返回一个包含每一个单词的 PCollection。


PCollection<String> input =      pipeline.apply(TextIO.read().from("./src/main/resources/words.txt"));
PCollection<String> output = input.apply( FlatMapElements.into(TypeDescriptors.strings()) .via((String line) -> Arrays.asList(line.split(" "))));
PAssert.that(output) .containsInAnyOrder("An", "advanced", "unified", "programming", "model");
pipeline.run();
复制代码

将结果写入文件

从前面的输入示例可以看到,Beam 提供了多个内置的输出连接器。在下面的例子中,我们将计算文本文件“words.txt”(只包含一个句子“An advanced unified programming model")中出现的每个单词的数量,输出结果将写入一个文本文件。


PCollection<String> input =      pipeline.apply(TextIO.read().from("./src/main/resources/words.txt"));
PCollection<KV<String, Long>> output = input .apply( FlatMapElements.into(TypeDescriptors.strings()) .via((String line) -> Arrays.asList(line.split(" "))) ) .apply(Count.<String>perElement());;
PAssert.that(output) .containsInAnyOrder( KV.of("An", 1L), KV.of("advanced", 1L), KV.of("unified", 1L), KV.of("programming", 1L), KV.of("model", 1L) );
output .apply( MapElements.into(TypeDescriptors.strings()) .via((KV<String, Long> kv) -> kv.getKey() + " " + kv.getValue())) .apply(TextIO.write().to("./src/main/resources/wordscount"));
pipeline.run();
复制代码


默认情况下,文件写入也针对并行性进行了优化,这意味着 Beam 将决定保存结果的最佳分片(文件)数量。这些文件位于 src/main/resources 文件夹中,文件名包含了前缀“wordcount”、碎片序号和碎片总数。


在我的笔记本电脑上运行它生成了 4 个分片:


第一个分片(文件名:wordscount-00001-of-00003):


An 1advanced 1
复制代码


第二个分片(文件名:wordscount-00002-of-00003):


unified 1model 1
复制代码


第三个分片(文件名:wordscount-00003-of-00003):


programming 1
复制代码


最后一个分片是空的,因为所有的单词都已经被处理完了。

扩展 Beam

我们可以通过编写自定义转换函数来扩展 Beam。自定义转换器将提高代码的可维护性,并消除重复工作。


基本上,我们需要创建一个 PTransform 的子类,将输入和输出的类型声明为 Java 泛型。然后重写 expand 方法,加入我们的逻辑,它将接受单个字符串并返回包含每个单词的 PCollection。


public class WordsFileParser extends PTransform<PCollection<String>, PCollection<String>> {
@Override public PCollection<String> expand(PCollection<String> input) { return input .apply(FlatMapElements.into(TypeDescriptors.strings()) .via((String line) -> Arrays.asList(line.split(" "))) ); } }
复制代码


用 WordsFileParser 来重构测试场景就变成了:


public class FileIOTest {
@Rule public final transient TestPipeline pipeline = TestPipeline.create();
@Test public void testReadInputFromFile() { PCollection<String> input = pipeline.apply(TextIO.read().from("./src/main/resources/words.txt"));
PCollection<String> output = input.apply( new WordsFileParser() );
PAssert.that(output) .containsInAnyOrder("An", "advanced", "unified", "programming", "model");
pipeline.run(); }
@Test public void testWriteOutputToFile() { PCollection<String> input = pipeline.apply(TextIO.read().from("./src/main/resources/words.txt"));
PCollection<KV<String, Long>> output = input .apply(new WordsFileParser()) .apply(Count.<String>perElement());
PAssert.that(output) .containsInAnyOrder( KV.of("An", 1L), KV.of("advanced", 1L), KV.of("unified", 1L), KV.of("programming", 1L), KV.of("model", 1L) );
output .apply( MapElements.into(TypeDescriptors.strings()) .via((KV<String, Long> kv) -> kv.getKey() + " " + kv.getValue())) .apply(TextIO.write().to ("./src/main/resources/wordscount"));
pipeline.run(); }}
复制代码


结果变成了更清晰和更模块化的管道。

时间窗口


Beam 的时间窗口


流式处理中一个常见的问题是将传入的数据按照一定的时间间隔进行分组,特别是在处理大量数据时。在这种情况下,分析每小时或每天的聚合数据比分析数据集的每个元素更有用。


在下面的例子中,我们将假设我们身处金融科技领域,我们正在接收包含金额和交易时间的事件,我们希望获取每天的交易总额。


Beam 提供了一种用时间戳来装饰每个 PCollection 元素的方法。我们可以通过这种方式创建一个代表 5 笔交易的 PCollection:


  • 金额 10 和 20 是在 2022 年 02 月 01 日转账的;

  • 金额 30、40 和 50 是在 2022 年 02 月 05 日转账的。


PCollection<Integer> transactions =      pipeline.apply(            Create.timestamped(                  TimestampedValue.of(10, Instant.parse("2022-02-01T00:00:00+00:00")),                  TimestampedValue.of(20, Instant.parse("2022-02-01T00:00:00+00:00")),                  TimestampedValue.of(30, Instant.parse("2022-02-05T00:00:00+00:00")),                  TimestampedValue.of(40, Instant.parse("2022-02-05T00:00:00+00:00")),                  TimestampedValue.of(50, Instant.parse("2022-02-05T00:00:00+00:00"))               )       );
复制代码


接下来,我们将应用两个转换函数:


  • 使用一天的时间窗口对交易进行分组;

  • 把每组的数量加起来。


PCollection<Integer> output =      Transactions             .apply(Window.into(FixedWindows.of(Duration.standardDays(1))))             .apply(Combine.globally(Sum.ofIntegers()).withoutDefaults());
复制代码


在第一个时间窗口(2022-02-01)中,预计总金额为 30(10+20),而在第二个窗口(2022-02-05)中,我们应该看到总金额为 120(30+40+50)。


PAssert.that(output)                   .inWindow(new IntervalWindow(                       Instant.parse("2022-02-01T00:00:00+00:00"),                       Instant.parse("2022-02-02T00:00:00+00:00")))                 .containsInAnyOrder(30);
PAssert.that(output) .inWindow(new IntervalWindow( Instant.parse("2022-02-05T00:00:00+00:00"), Instant.parse("2022-02-06T00:00:00+00:00"))) .containsInAnyOrder(120);
复制代码


每个 IntervalWindow 实例需要匹配所选时间段的确切开始和结束时间戳,因此所选时间必须是“00:00:00”。

总结

Beam 是一个强大的经过实战检验的数据框架,支持批处理和流式处理。我们使用 Java SDK 进行了 Map、Reduce、Group 和时间窗口等操作。


Beam 非常适合那些执行并行任务的开发人员,可以简化大规模数据处理的机制。


它的连接器、SDK 和对各种 Runner 的支持为我们带来了灵活性,你只要选择一个原生 Runner,如 Google Cloud Dataflow,就可以实现计算资源的自动化管理。


作者简介


Fabio Hiroki 是一位在 Mollie 公司从事金融服务的软件工程师。


原文链接


Introduction to Apache Beam Using Java

2022-06-28 09:014933

评论

发布
暂无评论
发现更多内容

面试笔记(一)事务连环炮

U2647

分布式事务 事务隔离级别 事务 4月日更

重磅功能!博睿数据APM助企业从容应对云原生架构演进

博睿数据

应用性能监控产品 Bonree Server 博睿数据 bonree

语音聊天室 anyHouse 使用手册

anyRTC开发者

ios android 音视频 WebRTC RTC

正点原子:STM32F103(战舰)、STM32F407(探索者)、STM32F103(MINI)原理图和PCB

不脱发的程序猿

开发板 stm32 硬件设计 4月日更 正点原子

500+解决方案已搭载,英特尔新至强出道不含糊

E科讯

Linux cat 命令

一个大红包

4月日更

SumSwap节点预售关注度飙升而Uniswap V3版本却备受争议

币圈资讯

火山引擎 Redis 云原生实践

火山引擎开发者社区

云原生 redis cluster

以太坊杀手?NA公链(Nirvana)Chain忠于挑战自己NAC公链

区块链第一资讯

关于机器学习的十大常见问题

澳鹏Appen

人工智能 机器学习 深度学习 大数据 数据

[知识它]一篇文章或一本书是怎么写出来的

知识它

写作技巧 写文章 快速写作 写作方法

年薪百万是社会认同,更是自身价值体现

博文视点Broadview

区块链农产品质量安全溯源,保证农产品品质

13530558032

阿里最强 Python 自动化工具开源了!

星安果

Python 开源 自动化 阿里

Golang 字符串分组

一代咩神

Go 语言

“区块链+电子处方”,医疗跟更健康

电微13828808271

【LeetCode】寻找旋转排序数组中的最小值Java题解

Albert

算法 LeetCode 4月日更

Javascript执行机制-任务队列

Sakura

kubectl top node报错及解决

箭上有毒

GitHub爆火!银四巨作:拼多多/蚂蚁/百度面经分享

比伯

Java 架构 面试 程序人生 技术宅

智慧平安社区建设,创建“三零平安社区”

13530558032

团队协作中,如何写出让同事赞不绝口的代码

有道技术团队

代码规范

EFT【阿凡提】等级规则、收益、排线方法与EFTalk十大关键点

币圈那点事

区块链数据共享平台—追踪、溯源、可信

电微13828808271

区块链+

华云大咖说 | 华云数据与海量数据携手共建国产云生态

华云数据

善盾SD币是什么?

飞亚科技

技术分享第二讲报名!

神策技术社区

大数据 活动 报名 神策

多功能工具箱Quicker+笔记软件flomo,竟然还能擦出这样的火花?

彭宏豪95

效率 工具软件 笔记 工具分享 4月日更

Python实现植物大战僵尸

不脱发的程序猿

Python GitHub 开源 游戏开发 4月日更

基于Vue和Quasar的前端SPA项目crudapi后台管理系统实战之布局菜单嵌套路由(三)

crudapi

Vue crud crudapi quasar 路由

区块链电子合同--助推合同数字化管理

13530558032

通过Java来学习Apache Beam_语言 & 开发_InfoQ精选文章