【AICon】探索RAG 技术在实际应用中遇到的挑战及应对策略!AICon精华内容已上线73%>>> 了解详情
写点什么

RXJava 实例解析

  • 2016-11-16
  • 本文字数:7256 字

    阅读完需:约 24 分钟

要点

  • 响应式编程是一种处理异步数据流的规范
  • 响应式为数据流的转换和聚合以及数据流的控制管理提供了工具支持
  • 弹珠交互图(Marble Diagram)以可交互的方式可视化响应式的结构
  • 响应式编程风格看起来跟 Java Streams API 有点相似,不过本质上是不一样的
  • 如何连接到动态流处理异步数据源

在高并发编程范式的发展过程中,我们使用过很多工具,比如 java.util.concurrent 包、Akka Streams 框架、CompletableFuture 类以及 Netty 框架。响应式编程近来大受欢迎,这要得益于它强大的功能和健壮的工具包。

响应式编程是一种处理异步数据流的规范,它为数据流的转换和聚合以及数据流的控制管理提供了工具支持,它让考量程序整体设计的工作变得简单。

但它使用起来并不简单,它的学习曲线也并不平坦。对于我们当中的那些数学家来说,学习响应式就好比当初他们从学习标准代数的无向量过渡到学习线性代数的向量、矩阵和张量,它们实际上是被单元化的数据流。传统的编程模式以对象为基础,而响应式以事件流为基础。事件可能以多种形式出现,比如对象、数据源、鼠标移动信息或者异常。在传统的编程范式里,“异常”这个词描述的是对意外情况的处理,因为在这个背景下,没有按照预想发生的情况都算异常。而在响应式编程范式里,异常却是一等公民。因为数据流一般是异步的,所以抛出异常是没有意义的,任何一个异常都会被当成数据流里的一个事件。

在这篇文章里,我们会探讨响应式编程的基本原理,以一种教与学的方式来强化一些重要的概念。

首先要记住的是,响应式里所有的东西都是流。Observable 封装了流,是最基本的单元。流可以包含零个或多个事件,有未完成和已完成两种状态,可以正常结束也可以发生错误。如果一个流正常完成或者发生错误,说明处理结束了,虽然有些工具可以对错误进行重试或者使用不同的流替换发生错误的流。

在运行我们给出的例子之前,需要把 RxJava 的依赖加入到项目里。可以在 Maven 里加入这个依赖:

复制代码
<dependency>
<groupId>io.reactivex.rxjava</groupId>
<artifactId>rxjava</artifactId>
<version>1.1.10</version>
</dependency>

Observable 类有几个静态工厂方法和实例方法,它们被用来生成各种新的 Observable 对象,或者把 Observable 对象添加到感兴趣的处理流程里。Observable 是可变的,所以针对它们的操作总是会生成新的 Observable 对象。为了更好地理解我们的例子,我们先来温习一下 Observable 的基本操作,因为在后面的例子里会用到它们。

Observable.just 方法生成一个简单对象,然后返回。例如:

复制代码
Observable.just("Howdy!")

这行代码生成一个新的 Observable 对象,在结束之前触发一个单独的事件,生成字符串“Howdy!”。

可以把新生成的 Observable 对象赋给一个 Observable 变量:

复制代码
Observable<String> hello = Observable.just("Howdy!");

不过知道这个还远远不够。就像那个著名的哲学问题一样,森林里的一颗树倒下来,如果周围没有人听见,那么就等于说树的倒下是无声无息的。一个 Observable 对象必须要有一个订阅者来处理它所生成的事件。所幸的是,现在 Java 支持 Lambda 表达式,我们就可以使用简洁的声明式风格来表示订阅操作:

复制代码
Observable<String> howdy = Observable.just("Howdy!");
howdy.subscribe(System.out::println);

这段代码仍然会生成字符串“Howdy!”。

跟 Observable 的其它方法一样,just 方法可以被重载:

复制代码
Observable.just("Hello", "World").subscribe(System.out::println);

这行代码会输出

复制代码
Hello
World

just 方法可以被重载,最多可以接收 10 个参数。这里要注意,输出的结果分成两行显示,说明它们是两个独立的事件。

让我们来看看如果使用列表会发生什么情况:

复制代码
List<String> words = Arrays.asList(
"the",
"quick",
"brown",
"fox",
"jumped",
"over",
"the",
"lazy",
"dog"
);
Observable.just(words).subscribe(word->System.out.println(word));

这段代码输出一个很平常的结果:

复制代码
[the, quick, brown, fox, jumped, over, the, lazy, dog]

我们本以为每个单词会是一个单独的事件,但实际上整个列表被当成了一个事件。为了达到我们想要的结果,我们引入 from 方法:

复制代码
Observable.from(words).subscribe(System.out::println);

这行代码把数组或者列表转换成一系列事件,每个元素就是一个事件。

执行这行代码会得到我们想要的多行输出:

复制代码
the
quick
brown
fox
jumped
over
the
lazy
dog

为了能从中获取编号,我们要在 Observable 上多做一些工作。

不过在写代码之前,我们先来看看另外两个操作,range 和 zip。range(i,n) 会创建一个包含 n 个数的流,它的第一个数是从 i 开始的。如果我们有办法把这种区间流跟上面的单词流组合在一起,就可以解决编号的问题。

RX Marbles 这个网站对我们学习响应式编程很有帮助。这个网站使用 JavaScript 渲染大部分响应式操作,而且是可交互的。每个响应式操作使用“弹珠”来描述一个或多个源流(source stream)以及由操作生成的结果流(result stream)。时间从左到右,事件用弹珠表示。单击或者拖动弹珠,可以看到它们是如何影响结果的。

执行一个 zip 操作就跟遵照医嘱一样简单。让我们用弹珠交互图来解释一下这个过程:

zip 操作通过成对的“zip”映射转换把源流的元素跟另一个给定流的元素组合起来,其中的映射可以使用 Lambda 表达式来表示。只要其中的一个流完成操作,整个 zip 操作也跟着停止,另一个未完成的流剩下的事件就会被忽略。zip 可以支持最多 9 个源流的 zip 操作。zipWith 操作可以把一个指定流合并到一个已存在的流里。

现在回到我们的例子上,我们可以使用 range 和 zipWith 操作加入编号,并用 String.format 做映射转换:

复制代码
Observable.from(words)
.zipWith(Observable.range(1, Integer.MAX_VALUE),
(string, count)->String.format("%2d. %s", count, string))
.subscribe(System.out::println);

这段代码会输出:

复制代码
1. the
2. quick
3. brown
4. fox
5. jumped
6. over
7. the
8. lazy
9. dog

看起来很不错!现在假设我们要列出单词里的字母而不是单词本身,这个时候要用到 flatMap,flatMap 会从 Observable 里获取事件源(对象、集合或数组),并把这些元素分别映射成 Observable,然后把这些 Observable 扁平化成一个单独的 Observable。

对于我们的例子来说,我们会先用 split 方法把每个单词拆分成一个字母数组,然后用 flatMap 创建一个新的 Observable 对象,这个 Observable 对象包含了组成这些单词的所有字母:

复制代码
Observable.from(words)
.flatMap(word -> Observable.from(word.split("")))
.zipWith(Observable.range(1, Integer.MAX_VALUE),
(string, count) -> String.format("%2d. %s", count, string))
.subscribe(System.out::println);

这段代码会输出:

复制代码
1. t
2. h
3. e
4. q
5. u
6. i
7. c
8. k
...
30. l
31. a
32. z
33. y
34. d
35. o
36. g

所有单词的字母都出现在这里。不过这样太繁琐了,我们希望相同的字母只出现一次:

复制代码
Observable.from(words)
.flatMap(word -> Observable.from(word.split("")))
.distinct()
.zipWith(Observable.range(1, Integer.MAX_VALUE),
(string, count) -> String.format("%2d. %s", count, string))
.subscribe(System.out::println);

这段代码输出:

复制代码
1. t
2. h
3. e
4. q
5. u
6. i
7. c
8. k
9. b
10. r
11. o
12. w
13. n
14. f
15. x
16. j
17. m
18. p
19. d
20. v
21. l
22. a
23. z
24. y
25. g

我们从小被告知“quick brown fox”这个全字母短句包含了英语里所有的字母,不过在这里我们只看到 25 个,而不是 26 个。现在让我们对这些字母进行排序,找出丢失的那个字母:

复制代码
.flatMap(word -> Observable.from(word.split("")))
.distinct()
.sorted()
.zipWith(Observable.range(1, Integer.MAX_VALUE),
(string, count) -> String.format("%2d. %s", count, string))
.subscribe(System.out::println);

这段代码输出:

复制代码
1. a
2. b
3. c
...
17. q
18. r
19. t
20. u
21. v
22. w
23. x
24. y
25. z

看样子是字母“s”丢掉了。为了得到我们期望的结果,需要对数组做一点修改:

复制代码
List<String> words = Arrays.asList(
"the",
"quick",
"brown",
"fox",
"jumped",
"over",
"the",
"lazy",
"dogs"
);
Observable.from(words)
.flatMap(word -> Observable.from(word.split("")))
.distinct()
.sorted()
.zipWith(Observable.range(1, Integer.MAX_VALUE),
(string, count) -> String.format("%2d. %s", count, string))
.subscribe(System.out::println);
1. a
2. b
3. c
4. d
5. e
6. f
7. g
8. h
9. i
10. j
11. k
12. l
13. m
14. n
15. o
16. p
17. q
18. r
19. s
20. t
21. u
22. v
23. w
24. x
25. y
26. z

现在好了!

到目前为止,所有的代码都跟 Java 8 里引入的 Streams API 很相似,不过这种相似只是一种巧合,因为响应式包含的内容远不止这些。

Java Streams 和 Lambda 表达式为编程语言带来很大的价值,不过归根结底,它们只是提供了一种方式来遍历集合和生成集合。它们的作用很有限,而且缺乏可扩展性和可重用性。尽管 Stream 的 parallel 操作可以并行执行任务,但在返回结果前程序无法对整个过程进行干预。相反,响应式引入了执行时间、节流、流量控制等概念,而且它们可以被连接到“永不停止”的处理流程里。响应式产生的结果虽然不是集合,但你可以用任何期望的方式来处理这些结果。

让我们通过弹珠交互图更好地理解这些概念。

merge 操作可以把最多 9 个源流合并到一个结果里,而且可以保留它们的顺序。无需担心这里会出现竞赛条件,因为所有的事件都被“扁平化”到一个单独的线程里,包括异常事件和结束事件。

debounce 操作会把在一个时间段内紧挨在一起的几个事件看成一个单独事件,这几个事件里只有最后一个会被触发:

可以看到,上下两个图中的“1”之间有一个指定的时间间隔,而 2、3、4、5 之间的时间间隔都小于这个间隔,所以它们被看成单个事件。如果把“5”往右挪一点,结果就不一样了:

另一个有趣的操作是 amb,它是一种不确定性的操作。

amb 操作会从所有的输入流中选择第一个出现的流,然后忽略其它剩下的流。如下图,第二个流是最先出现的,所以 amb 操作选择了这个流。

如果把第一个流里的“20”往左移动,超过第二个流的第一个元素,那么生成的结果又会不一样:

如果你有一个需要接入到某个数据源的处理流程,比如从消息主题上获取数据,可能是 Bloomberg 或者 Reuters,你并不关心接入的到底是哪一个,只要从中选择一个就可以了。在这种情况下,amb 操作就会很有用。

Tick Tock

现在,我们可以使用这些工具基于流生成各种有意义的结果。在接下来的这个例子里,我们有一个数据源,它会每秒钟生成一个事件。不过为了节省 CPU,我们让它在周末时每三秒生成一次。我们使用混合型的“节奏器”按照一定的节奏生成数据。

首先,我们要创建一个返回 boolean 的方法,它会检查当前时间是否是周末,如果是就返回 true,否则就返回 false:

复制代码
private static boolean isSlowTickTime() {
return LocalDate.now().getDayOfWeek() == DayOfWeek.SATURDAY ||
LocalDate.now().getDayOfWeek() == DayOfWeek.SUNDAY;
}

对于边读这篇文章边在 IDE 里执行这段代码的读者来说,他们可能不想等到下个周末才来验证这个方法是否可行,所以可以使用下面的替代实现,这个实现会在一个 15 秒钟内返回 true,在另一个 15 秒钟内返回 false:

复制代码
private static long start = System.currentTimeMillis();
public static Boolean isSlowTime() {
return (System.currentTimeMillis() - start) % 30_000 >= 15_000;
}

接下来我们创建两个 Observable 对象,fast 和 slow,然后使用过滤器对它们进行调度,并把它们合并起来。

我们使用 Observable.interval 操作来安排调度,它会在每个指定的时间间隔内产生一次数据(从 0 开始计算)。

复制代码
Observable<Long> fast = Observable.interval(1, TimeUnit.SECONDS);
Observable<Long> slow = Observable.interval(3, TimeUnit.SECONDS);

fast 每秒生成一个事件,slow 每三秒生成一个事件(我们会忽略事件的值,因为我们只对执行时间感兴趣)。

现在我们把这两个 Observable 合并到一起,通过使用过滤器让 fast 流在工作日生成数据(或者在 15 秒内),slow 流在周末生成数据(或者在另一个 15 秒内)。

复制代码
Observable<Long> clock = Observable.merge(
slow.filter(tick-> isSlowTickTime()),
fast.filter(tick-> !isSlowTickTime())
);

最后,我们要添加一个打印时间的订阅动作。在执行这些代码时,它会根据我们的调度安排打印出系统时间。

复制代码
clock.subscribe(tick-> System.out.println(new Date()));

为了防止程序中途退出,需要在方法的末尾添加一行代码(注意要处理 InterruptedException 异常)。

复制代码
Thread.sleep(60_000);

运行代码的结果:

复制代码
Fri Sep 16 03:08:18 BST 2016
Fri Sep 16 03:08:19 BST 2016
Fri Sep 16 03:08:20 BST 2016
Fri Sep 16 03:08:21 BST 2016
Fri Sep 16 03:08:22 BST 2016
Fri Sep 16 03:08:23 BST 2016
Fri Sep 16 03:08:24 BST 2016
Fri Sep 16 03:08:25 BST 2016
Fri Sep 16 03:08:26 BST 2016
Fri Sep 16 03:08:27 BST 2016
Fri Sep 16 03:08:28 BST 2016
Fri Sep 16 03:08:29 BST 2016
Fri Sep 16 03:08:30 BST 2016
Fri Sep 16 03:08:31 BST 2016
Fri Sep 16 03:08:32 BST 2016
Fri Sep 16 03:08:35 BST 2016
Fri Sep 16 03:08:38 BST 2016
Fri Sep 16 03:08:41 BST 2016
Fri Sep 16 03:08:44 BST 2016
. . .

可以看到,前面 15 个事件之间的时间间隔都是 1 秒,后面 15 秒内的事件之间的时间间隔是 3 秒,就像我们所期望的那样。

连接到已存在的数据源

以上方法用于创建能够生成静态数据的 Observable 是没有问题的。但如何把 Observable 连接到已有的数据源上,并享受响应式的流量控制和流操作策略为我们带来的好处呢?

静态 Observable 和动态 Observable

首先我们先岔开话题,来介绍一下静态 Observable 和动态 Observable 之间的区别。

到目前为止我们讨论的都是静态 Observable,它们提供静态的数据,尽管我们可以在执行时间上做一些调节,不过这远远 不够。静态 Observable 只在有订阅者的情况下才会生成事件,而且订阅者收到的是历史数据,不管它们是从何时开始订阅的。相反,动态 Observable 不管有多少个订阅者都会生成数据,而且只生成最新的数据(除非使用了缓存)。可以通过两个步骤把静态 Observable 转化成动态 Observable:

  1. 调用 Observable 的 publish 方法,生成一个新的 ConnectableObservable
  2. 调用 ConnectableObservable 的 connect 方法,开始生成数据

要连接到一个已有的数据源上,可以在这个数据源上添加监听器(如果你喜欢这么做),监听器会把事件传播给订阅者,然后在每个事件发生时调用订阅者的 onNext 方法。在实现监听器的时候要确保每个订阅者仍然处于订阅状态,否则就要停止把事件传播给它,同时要注意回压信号。所幸的是,这些工作可以由 RxJava 的 AsyncEmitter 来处理。假设我们有一个叫做 SomeFeed 的数据服务,它会生成报价事件,同时有一个 SomeListener 监听这些报价事件以及其它生命周期事件。在 GitHub 上已经有一个实现,如果你想自己动手运行这些代码,可以去下载。

我们的数据源监听器有两个方法:

复制代码
public void priceTick(PriceTick event);
public void error(Throwable throwable);

PriceTick 类包含了 date、instrument 和 price 字段,还有一个 isLast 方法用来判断它是否是最后一个事件:

(点击放大图像)

让我们来看看如何使用 AsyncEmitter 把 Observable 连接到一个实时的数据源上:

复制代码
SomeFeed<PriceTick> feed = new SomeFeed<>();
Observable<PriceTick> obs =
Observable.fromEmitter((AsyncEmitter<PriceTick> emitter) ->
{
SomeListener listener = new SomeListener() {
@Override
public void priceTick(PriceTick event) {
emitter.onNext(event);
if (event.isLast()) {
emitter.onCompleted();
}
}
{1}
@Override
public void error(Throwable e) {
emitter.onError(e);
}
};
feed.register(listener);
}, AsyncEmitter.BackpressureMode.BUFFER);

这段代码几乎是逐字逐句地从 Observable 类的 Javadoc 里摘抄出来的。AsyncEmitter 封装了监听器(第 5 行)的创建过程,并把它注册到数据源上(第 19 行)。Observable 直接让订阅者对自己进行了订阅。数据源生成的事件被委托给了 AsyncEmitter(第 8 行)。第 20 行告诉观察者要缓冲所有的事件通知,直到它们被订阅者消费。除了缓冲,还有其它几种回压策略:

BackpressureMode.NONE 不使用回压。如果流的速度无法保持同步,可能会抛出 MissingBackpressureException 或 IllegalStateException。

BackpressureMode.ERROR 会在下游跟不上速度时抛出 MissingBackpressureException。

BackpressureMode.DROP 会在下游跟不上速度时把 onNext 的值丢弃。

BackpressureMode.LATEST 会一直保留最新的 onNext 的值,直到被下游消费掉。

这样生成的是静态 Observable。静态 Observable 在没有订阅者的时候不会生成数据,而且所有订阅者收到的是同样的历史数据,而这不是我们想要的。

为了把它转化成动态 Observable,让所有订阅者可以实时地接收事件通知,我们必须调用 publish 和 connect 方法,就像之前提到的那样:

复制代码
ConnectableObservable<PriceTick> hotObservable = obs.publish();
hotObservable.connect();

最后,我们可以对它进行订阅并显示报价:

复制代码
hotObservable.subscribe((priceTick) ->
System.out.printf("%s %4s %6.2f%n", priceTick.getDate(),
priceTick.getInstrument(), priceTick.getPrice()));

关于作者

Victor Grazi是 InfoQ 的 Java 消息队列负责人。他是 2012 年的 Oracle Java Champion,在 Nomura Securities 负责核心平台工具的开发工作,同时是一名技术顾问和 Java 布道师。他还经常在技术大会上做演讲,是“Java Concurrent Animated”和“Bytecode Explorer”开源项目的负责人。

查看英文原文: RXJava by Example

公众号推荐:

2024 年 1 月,InfoQ 研究中心重磅发布《大语言模型综合能力测评报告 2024》,揭示了 10 个大模型在语义理解、文学创作、知识问答等领域的卓越表现。ChatGPT-4、文心一言等领先模型在编程、逻辑推理等方面展现出惊人的进步,预示着大模型将在 2024 年迎来更广泛的应用和创新。关注公众号「AI 前线」,回复「大模型报告」免费获取电子版研究报告。

AI 前线公众号
2016-11-16 16:487279
用户头像

发布了 322 篇内容, 共 133.7 次阅读, 收获喜欢 142 次。

关注

评论

发布
暂无评论
发现更多内容

最好用的 8 款 React Datepicker 时间日期选择器测评推荐

蒋川

react.js 组件 组件库 低代码平台 Javascript框架

【ELT.ZIP】OpenHarmony啃论文俱乐部——电子设备软件更新压缩

ELT.ZIP

鸿蒙 rsync 数据压缩 ELT.ZIP

计算机网络——物理层

工程师日月

计算机网络 5月月更

拆分电商系统为微服务

danny_xian

Go 依赖注入 Wire 详解 - 用户指南

baiyutang

Go 微服务 依赖注入 5月月更

代码之外:谈谈算法该怎么准备,不准备可以吗

宇宙之一粟

算法面试 代码之外 5月月更

架构实战营模块 6 作业

热猫

架构模块六

小马

「架构实战营」

运动健康深入人心,MOVE PROTOCOL引领品质生活

BlockChain先知

OKALEIDO解决NFT流动性不足难题,更有创新平台通证分配方案

股市老人

【ELT.ZIP】OpenHarmony啃论文俱乐部——人工智能短字符串压缩

ELT.ZIP

人工智能 鸿蒙 数据压缩 ELT.ZIP

拆分电商系统为微服务

踩着太阳看日出

架构训练营

flask框架的学习笔记【二】

恒山其若陋兮

5月月更

HashMap 源码分析-基础结构

zarmnosaj

5月月更

模块六:拆分电商系统为微服务

jiaoxn

「架构实战营」

为了兼容IE,配置Babel+Webpack

空城机

webpack 5月月更

九、云原生链路追踪

穿过生命散发芬芳

链路追踪 5月月更

今天要学习的技术点,Python 筛选数字,模块导入,特殊变量__all__ 实战博客

梦想橡皮擦

5月月更

绿色环保作为经济长线主题,MOVE PROTOCOL运动APP来助力

股市老人

Java Core「2」synchronized 关键字

Samson

学习笔记 5月月更 Java core

在线火星文转换器工具

入门小站

工具

模块六作业: 拆分电商系统为微服务

凯博无线

架构实战营 - 模块六 - 作业

michael

架构实战营 #架构实战营 架构师实战营 「架构实战营」

M_6: 拆分电商系统为微服务

Jadedev

架构训练营

位运算小妙招-求二进制序列中0的个数

芒果酱

数据结构 算法 5月月更

拆分电商系统为微服务

流火

单片机开发入门知识介绍

DS小龙哥

5月月更

【ELT.ZIP】OpenHarmony啃论文俱乐部——多层存储分级数据压缩

ELT.ZIP

鸿蒙 数据压缩 ELT.ZIP HCompress

拆分电商系统为微服务

大眼喵

「架构实战营」

在线HTML转XML工具

入门小站

工具

AI简报-逆光也清晰-色彩增强算法CURL

AIWeker

人工智能 深度学习 5月月更 AI简报

RXJava实例解析_Java_Victor Grazi_InfoQ精选文章