最新发布《数智时代的AI人才粮仓模型解读白皮书(2024版)》,立即领取! 了解详情
写点什么

测试 RxJava

  • 2016-11-03
  • 本文字数:9057 字

    阅读完需:约 30 分钟

关键要点:

  • RxJava 含有内建的、测试友好的解决方案。
  • 使用 TestSubscriber 去验证 Observable。
  • 使用 TestScheduler 可实现对时间的严格控制。
  • Awaitility 库提供了对测试环境进一步的控制。

你已经阅读过 RxJava 的相关内容,也已经在互联网上体验过像“ RxJava by Example ”中的那些示例,现在打算在自己的代码中探索一下响应式编程了。但是,现在却一直困扰着如何测试那些可能会在代码库中发现的新功能呢?

使用响应式编程,就必须转变对给定问题的推理方式,因为我们要聚焦于作为事件流的流动数据,而非个别数据项。事件通常是被不同的线程所产生和消费,因此在编写测试时必须要对并发问题有着清晰的认识。幸运的是,RxJava 提供了测试 Observable 和 Subscription 的内建支持,并且是直接构建于 RxJava 的核心依赖中。

第一步

让我们回顾一下在“ RxJava by Example ”一文中所给出的那个词汇的例子,看一下如何对该例子作测试。让我们从基础测试工具的设置开始。在我们的测试架构中,使用了 JUnit 作为测试工具。

复制代码
import rx.Observable;
import rx.observers.TestSubscriber;
import rx.plugins.RxJavaHooks;
import rx.schedulers.Schedulers;
import java.util.*;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.Matchers.*;
import org.junit.Test;
import static org.junit.Assert.assertThat;
public class RxJavaTest {
private static final List<String> WORDS = Arrays.asList(
"the",
"quick",
"brown",
"fox",
"jumped",
"over",
"the",
"lazy",
"dog"
);
}

事实上在没有给定调度器(Scheduler)的情况下,Subscription 将默认运行于调用线程上。因此我们将在首个测试中使用原生的方法。这意味着我们可实现一个 Subscription 接口的对象,在 Subscription 发生后就立刻对其状态做断言(assert)。

复制代码
@Test
public void testInSameThread() {
// given:
List<String> results = new ArrayList<>();
Observable<String> observable = Observable
.from(WORDS)
.zipWith(Observable.range(1, Integer.MAX_VALUE),
(string, index) -> String.format("%2d. %s", index, string));
// when:
observable.subscribe(results::add);
// then:
assertThat(results, notNullValue());
assertThat(results, hasSize(9));
assertThat(results, hasItem(" 4. fox"));
}

注意这里使用了显式的 List容器,与实际订阅者一起累计结果。由于给定的测试很简单,所以可能会使你认为这种显式累加器的方法已经足够好了。但是切记产品级的 Observable 中可能封装了错误或可能产生意外的事件。例子中的 Subscriber 与累加器的简单组合并不足以覆盖这种情况。但不用为此烦恼,RxJava 提供的 TestSubscriber 类型就是用于处理这种情况的。下面我们使用 TestSubscriber 类型重构上面的测试。

复制代码
@Test
public void testUsingTestSubscriber() {
// given:
TestSubscriber<String> subscriber = new TestSubscriber<>();
Observable<String> observable = Observable
.from(WORDS)
.zipWith(Observable.range(1, Integer.MAX_VALUE),
(string, index) -> String.format("%2d. %s", index, string));
// when:
observable.subscribe(subscriber);
// then:
subscriber.assertCompleted();
subscriber.assertNoErrors();
subscriber.assertValueCount(9);
assertThat(subscriber.getOnNextEvents(), hasItem(" 4. fox"));
}

TestSubscriber 不仅可替代用户累加器,还另给出了一些行为。例如它能够给出接收到的消息和每个事件相关数据的规模,它也可对 Subscription 被完成且在 Observable 消费期间没有错误出现的状态做断言。虽然当前测试中的 Observable 并未生成任何的错误,但是回到“ RxJava by Example ”一文,我们从中得知了 Observable 将例外与数据事件等同对待。我们可通过如下的方式通过连接例外事件而模拟错误:

复制代码
@Test
public void testFailure() {
// given:
TestSubscriber<String> subscriber = new TestSubscriber<>();
Exception exception = new RuntimeException("boom!");
Observable<String> observable = Observable
.from(WORDS)
.zipWith(Observable.range(1, Integer.MAX_VALUE),
(string, index) -> String.format("%2d. %s", index, string))
.concatWith(Observable.error(exception));
// when:
observable.subscribe(subscriber);
// then:
subscriber.assertError(exception);
subscriber.assertNotCompleted();
}

在我们所给出的有限用例中,所有的机制运行良好。但是实际的产品代码可能会完全不同于例子。因此在下文中,我们将考虑一些更加复杂的产品实例。

定制调度器(Scheduler)

在产品代码中,很多用例中的 Observable 都是在特定的线程上执行,这种线程在响应式编程环境中被称为“调度器(Scheduler)”。很多 Observable 操作将可选的调度器参数作为附加参数使用。RxJava 定义了一系列任何时候都可用的命名调度器,包括 IO 调度器(io)、计算调度器(computation,为共享线程)和新线程调度器(newThread)。开发人员也可去实现个人定制的调度器。让我们通过指定计算调度器来修改 Observable 的代码吧。

复制代码
@Test
public void testUsingComputationScheduler() {
// given:
TestSubscriber<String> subscriber = new TestSubscriber<>();
Observable<String> observable = Observable
.from(WORDS)
.zipWith(Observable.range(1, Integer.MAX_VALUE),
(string, index) -> String.format("%2d. %s", index, string));
// when:
observable
.subscribeOn(Schedulers.computation())
.subscribe(subscriber);
// then:
subscriber.assertCompleted();
subscriber.assertNoErrors();
assertThat(subscriber.getOnNextEvents(), hasItem(" 4. fox"));
}

当运行时就会立刻发现该代码是存在问题的。Subscriber 在测试线程上执行其断言,但是 Observable 在后台线程(计算线程)上生成值。这意味着执行 Subscriber 断言可能先于 Observable 生成所有相关事件,因而导致测试的失败。

为使测试顺利执行,有如下的一些策略可选:

  • 将 Observable 转化为阻塞式的。
  • 强制测试等待,直至给定的条件被满足。
  • 将计算调度器转换为即刻(Schedulers.immediate())调度器。

我们将对每个策略做展开介绍,但将从“将 Observable 转化为阻塞式”开始,因为实现该策略所需做的技术工作最少,这些工作与所使用的调度器无关。我们假设数据在后台线程中生成,这将导致 Subscriber 从同一后台线程得到通知。

我们要做的是强制生成所有的事件,并在下一个声明被执行前就在测试中完成 Observable。这是通过在 Observable 自身上调用 toBlocking() 方法实现的。

复制代码
@Test
public void testUsingBlockingCall() {
// given:
Observable<String> observable = Observable.from(WORDS)
.zipWith(Observable.range(1, Integer.MAX_VALUE),
(string, index) -> String.format("%2d. %s", index, string));
// when:
Iterable<String> results = observable
.subscribeOn(Schedulers.computation())
.toBlocking()
.toIterable();
// then:
assertThat(results, notNullValue());
assertThat(results, iterableWithSize(9));
assertThat(results, hasItem(" 4. fox"));
}

该方法虽然适用于我们所给出的简单代码,但可能并不适用于实际的产品代码。如果生产者生成所有的数据需要很长的时间,那将会产生什么后果?这将使测试变得非常慢,并增加了编译时间,还可能会有其它的性能问题。这里我推荐一个便利的程序库,就是 Awaitility 。简单地说,Awaitility 是一个以精确、简单易读的方式对异步系统相关期望进行表述的 DSL。在项目中可以用 Maven 添加 Awaitility 的依赖关系。

复制代码
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<version>2.0.0</version>
<scope>test</scope>
</dependency>

或是使用 Gradle:

testCompile 'org.awaitility:awaitility:2.0.0'Awaitility DSL 的接入点是 org.awaitility.Awaitility.await() 方法(参见下面例子中的第 13 和 14 行代码)。可以使用 Awaitility 定义使测试继续所必须达成的条件,也可在条件中加入超时或其它的时序约束,例如最小、最大或持续范围。对于上面的例子,下面的代码给出了如何在结果中使用 Awaitility:

复制代码
@Test
public void testUsingComputationScheduler() {
// given:
TestSubscriber<String> subscriber = new TestSubscriber<>();
Observable<String> observable = Observable.from(WORDS)
.zipWith(Observable.range(1, Integer.MAX_VALUE),
(string, index) -> String.format("%2d. %s", index, string));
// when:
observable.subscribeOn(Schedulers.computation())
.subscribe(subscriber);
await().timeout(2, SECONDS)
.until(subscriber::getValueCount, equalTo(9));
// then:
subscriber.assertCompleted();
subscriber.assertNoErrors();
assertThat(subscriber.getOnNextEvents(), hasItem(" 4. fox"));
}

此版本测试并未以任何方式改变 Observable 的本质,这使得你做测试时不必对产品代码做任何改动。该版本测试使用最多 2 秒的等待时间通过检查 Subscriber 状态使 Observable 执行其作业。如果一切进行顺利,在 2 秒内就可将 Subscriber 的状态释放给所有的 9 个事件。

Awaitility 具有和 Hamcrest 的匹配符、Java 8 的 lambda 表达式和方法引用等的良好协作,从而给出精确的和可读的测试条件。Awaitility 还提供了预制扩展,用于那些被广泛使用的 JVM 语言,其中包括 Groovy 和 Scala。

我们要给出最后一个策略中使用了 RxJava 的扩展机制,该扩展是以 RxJava API 的组成部分发布的。RxJava 中定义了一系列的扩展点,允许对几乎任何默认的 RxJava 行为进行微调。这种扩展机制使我们可以针对特定的 RxJava 特性提供修改过的值。利用该机制,在无需关心生成代码中所指定的调度器的情况下,我们可在测试中注入选定的调度器。这正是我们所寻找的方法,该方法被封装在 RxJavaHooks 类中。假设产品代码依赖于计算调度器,我们将覆盖它的默认值,返回一个调度器,它作为被调用的代码使事件处理发生,这是即刻调度器(Schedulers.immediate())。下面给出测试的代码:

复制代码
@Test
public void testUsingRxJavaHooksWithImmediateScheduler() {
// given:
RxJavaHooks.setOnComputationScheduler(scheduler -> Schedulers.immediate());
TestSubscriber<String> subscriber = new TestSubscriber<>();
Observable<String> observable = Observable.from(WORDS)
.zipWith(Observable.range(1, Integer.MAX_VALUE),
(string, index) -> String.format("%2d. %s", index, string));
try {
// when:
observable.subscribeOn(Schedulers.computation())
.subscribe(subscriber);
// then:
subscriber.assertCompleted();
subscriber.assertNoErrors();
subscriber.assertValueCount(9);
assertThat(subscriber.getOnNextEvents(), hasItem(" 4. fox"));
} finally {
RxJavaHooks.reset();
}
}

在测试中,产品代码察觉不到计算调度器是即刻的。请注意钩子函数必须被重置,否则即刻调度器的设置可能会发生泄漏,导致在各处的测试被破坏。使用 try/finall 代码块会在一定程度上模糊了测试的目的,但是幸运的是我们可以使用 JUnit 规则重构该行为,使测试更加精炼,结果更可读。下面给出使用上述规则的一种可能的实现代码:

复制代码
public class ImmediateSchedulersRule implements TestRule {
@Override
public Statement apply(final Statement base, Description description) {
return new Statement() {
@Override
public void evaluate() throws Throwable {
RxJavaHooks
.setOnIOScheduler(scheduler -> Schedulers.immediate());
RxJavaHooks
.setOnComputationScheduler(scheduler -> Schedulers.immediate());
RxJavaHooks
.setOnNewThreadScheduler(scheduler -> Schedulers.immediate());
try {
base.evaluate();
} finally {
RxJavaHooks.reset(); }
}
};
}
}

此外,我们还对另外两个调度器的生成方法做了重写。该规则对此后其它的测试目标更为通用。在新的测试用例类中,该规则的使用方法很直接,只需简单地定义一个域,并将其中新类型标注为 @Rule 即可。示例代码如下:

复制代码
@Rule
public final ImmediateSchedulersRule schedulers = new ImmediateSchedulersRule();
@Test
public void testUsingImmediateSchedulersRule() {
// given:
TestSubscriber<String> subscriber = new TestSubscriber<>();
Observable<String> observable = Observable.from(WORDS)
.zipWith(Observable.range(1, Integer.MAX_VALUE),
(string, index) -> String.format("%2d. %s", index, string));
// when:
observable.subscribeOn(Schedulers.computation())
.subscribe(subscriber);
// then:
subscriber.assertNoErrors();
subscriber.assertCompleted();
subscriber.assertValueCount(9);
assertThat(subscriber.getOnNextEvents(), hasItem(" 4. fox"));
}

最终我们可得到与前面测试一样的行为,却没有像前面测试那样的杂乱。下面用一些篇幅来回顾一下我们目前已经做到的事情:

  • Subscribers 将在同一线程中处理数据,只要没有使用特定的调度器。这意味着在 Subscriber 向 Observable 做订阅后,我们就可在该 Subscriber 上做断言。
  • TestSubscriber 可累计事件,并给出自身状态的追加断言。
  • 任何 Observable 都可转换为阻塞式的,这使得无论 Observable 使用何种调度器,我们都可以同步等待事件的生成。
  • RxJava 提供了扩展机制,允许开发人员重写其默认方法,并以适当的方式注入到产品代码中。
  • 并发代码可使用 Awaitility DSL 测试。

上述的每个技术都作用于不同的场景中,但是所有技术都是通过“共同的线程”(译者注:作者在原文中指出 common thread 是作为双关语使用的,其另一个意思是“类似的思路”)相关联:在对 Subscriber 状态做断言之前,测试代码需等待 Observable 完成。考虑到 Observable 的行为会生成数据,是否有方法对该行为进行检查呢?换句话说,是否可以用编程的方式做 Observable 的现场调试?我们将在后文中给出这样的技术。

操控时间

到目前为止我们已用黑箱方式测试了 Observable 和 Subscription。下面我们将考虑另外一种操控时间的技术,该技术使我们可以在 Observable 依然处于活动状态时,打开引擎盖去查看 Subscriber 状态。换句话说,我们将使用采用了 RxJava 的 TestScheduler 类白箱测试技术,这可以说是 RxJava 再一次来救场。这种特定的调度器可精确地设定时间的内部使用方式,例如可将时间提前半秒,或是使时间跳跃 5 秒。我们将首先给出这种新调度器实例的创建方法,然后再讨论代码的测试。

复制代码
@Test
public void testUsingTestScheduler() {
// given:
TestScheduler scheduler = new TestScheduler();
TestSubscriber<String> subscriber = new TestSubscriber<>();
Observable<Long> tick = Observable.interval(1, SECONDS, scheduler);
Observable<String> observable = Observable.from(WORDS)
.zipWith(tick, (string, index) -> String.format("%2d. %s", index, string));
observable.subscribeOn(scheduler)
.subscribe(subscriber);
// expect:
subscriber.assertNoValues();
subscriber.assertNotCompleted();
// when:
scheduler.advanceTimeBy(1, SECONDS);
// then:
subscriber.assertNoErrors();
subscriber.assertValueCount(1);
subscriber.assertValues(" 0. the");
// when:
scheduler.advanceTimeTo(9, SECONDS);
subscriber.assertCompleted();
subscriber.assertNoErrors();
subscriber.assertValueCount(9);
}

该“产品”代码有了略微的改变,这是由于我们使用了绑定到调度器时隙(interval())的方法生成计数(第 6 行),而非生成一个计数的范围。但这样做具有一个副作用,就是计数是从零开始生成的,而非从 1 开始。一旦配置了 Observable 和测试调度器,我们立刻做出这样的断言,即假定 Subscriber 不具有值(第 15 行)且没有被完成或生成任何的错误(第 16 行)。这是一个完整性测试,因为此时调度器并没有被移动,因而没有任何值被 Observable 产生或是被 Subscriber 接收到。

下面将时间向前调 1 整秒(第 19 行),该操作将会导致 Observable 生成第一个值,这正是随后的断言集所要检查的(第 22 到 24 行)。

下面将时间从当前时间调到 9 秒。需要注意的是,这意味着将时间准确地调整为调度器启动后的第 9 秒(并非是向前调 1 秒后再向前调 9 秒,即调度器检查启动后的第 10 秒)。换句话说,advanceTimeBy() 方法将调度器的时间调整为相对于当前位置的时间,而 advanceTimeTo() 以绝对的方式调整时间。此后我们做出下一轮的断言(第 28 到 20 行),用于确保所有的数据由 Observable 生成且被 Subscriber 消费。另一件需要说明的事情就是使用 TestScheduler 时,真实的时间是立刻发生调整的,这着意味着测试并不用实际等待 9 秒才去完成。

正如你所看到的,该调度器的使用是非常便利的,仅需将该调度器提供给正在测试的 Observable 即可。但是对使用了指定类型调度器的 Observable,该调度器并不能很好地适用。但是稍等一下,之前我们看到的是如何使用 RxJavaHooks 切换一个不影响生产代码的调度器,而这一次是提供一个代替即刻调度器的 TestScheduler(第 13 到 15 行)。我们甚至可以 apply 定制 JUnit 规则同样的技术,使之前的代码可以用更重用的方式予以重写。首先该新规则为:

复制代码
public class TestSchedulerRule implements TestRule {
private final TestScheduler testScheduler = new TestScheduler();
public TestScheduler getTestScheduler() {
return testScheduler;
}
@Override
public Statement apply(final Statement base, Description description) {
return new Statement() {
@Override
public void evaluate() throws Throwable {
RxJavaHooks.setOnIOScheduler(scheduler -> testScheduler);
RxJavaHooks.setOnComputationScheduler(scheduler -> testScheduler);
RxJavaHooks.setOnNewThreadScheduler(scheduler -> testScheduler);
try { base.evaluate(); }
finally { RxJavaHooks.reset(); }
}
};
}
}

紧接着是实际的测试代码(在一个新的测试用例类中),去使用我们的测试规则:

复制代码
@Rule
public final TestSchedulerRule testSchedulerRule = new TestSchedulerRule();
@Test
public void testUsingTestSchedulersRule() {
// given:
TestSubscriber<String> subscriber = new TestSubscriber<>();
Observable<String> observable = Observable.from(WORDS)
.zipWith(Observable.interval(1, SECONDS),
(string, index) -> String.format("%2d. %s", index, string));
observable.subscribeOn(Schedulers.computation())
.subscribe(subscriber);
// expect
subscriber.assertNoValues();
subscriber.assertNotCompleted();
// when:
testSchedulerRule.getTestScheduler().advanceTimeBy(1, SECONDS);
// then:
subscriber.assertNoErrors();
subscriber.assertValueCount(1);
subscriber.assertValues(" 0. the");
// when:
testSchedulerRule.getTestScheduler().advanceTimeTo(9, SECONDS);
subscriber.assertCompleted();
subscriber.assertNoErrors();
subscriber.assertValueCount(9);
}

这样你就成功地实现了它。使用经由 RxJavaHooks 注入 TestScheduler 的方法,可在无需更改原始 Observable 组合的情况下编写测试代码,此外它给出了一种在 observable 自身执行期间改变时间、并在特定点上做断言的方法。在本文中给出的所有这些技术,应该足够你选择用来测试 RxJava 的代码了。

未来

RxJava 是最先为 Java 提供响应式编程能力的程序库之一。为了使 RxJava API 更好地符合 Reactive Streams 规范,即将推出的 2.0 版将会是重新设计的。Reactive Streams 规范以 Java 和 JavaScript 运行时为目标,提供了使用非阻塞背压机制(back pressure)的异步流处理标准。这意味着下一版的 RxJava 中将会出现一些 API 改进。对这些改进的详细描述参见 RxJava wiki

对于测试而言,这些核心类型(Observable、Maybe 和 Single)现在都给出了便利易用的 test() 方法,实现现场创建 TestSubscriber 实例。也可在 TestSubscriber 上链接方法调用,对这类用法也有一些新的断言方法。

关于作者

Andres Almiray是一位 Java/Groovy 开发者和 Java 冠军程序员,具有超过 17 年的软件设计和开发经验。他在 Java 早期推出的时代就参与了 Web 和桌面应用的开发。他是开源软件的忠实信徒,参与了 Groovy、JMatter、Asciiidoctor 等广为人知的项目。他是 Griffon 架构的创始者和现任项目领导者,还是 JSR 377 规范的牵头者。

查看英文原文: Testing RxJava


感谢冬雨对本文的审校。

给InfoQ 中文站投稿或者参与内容翻译工作,请邮件至 editors@cn.infoq.com 。也欢迎大家通过新浪微博( @InfoQ @丁晓昀),微信(微信号: InfoQChina )关注我们。

公众号推荐:

跳进 AI 的奇妙世界,一起探索未来工作的新风貌!想要深入了解 AI 如何成为产业创新的新引擎?好奇哪些城市正成为 AI 人才的新磁场?《中国生成式 AI 开发者洞察 2024》由 InfoQ 研究中心精心打造,为你深度解锁生成式 AI 领域的最新开发者动态。无论你是资深研发者,还是对生成式 AI 充满好奇的新手,这份报告都是你不可错过的知识宝典。欢迎大家扫码关注「AI前线」公众号,回复「开发者洞察」领取。

2016-11-03 18:273951
用户头像

发布了 227 篇内容, 共 71.4 次阅读, 收获喜欢 27 次。

关注

评论

发布
暂无评论
发现更多内容

开源项目中的设计模式

dony.zhang

架构师训练营第三周学习总结:面向对象设计和设计模式

hifly

设计模式 极客大学架构师训练营 OOD SOLID 策略模式

代码重构总结

Lane

极客大学架构师训练营

架构师week3作业

平淡人生

设计模式-单例与组合

ashuai1106

架构师 极客大学架构师训练营

架构师训练营第三周总结

架构师 极客大学架构师训练营

架构师训练营-第三周-20200624-单例模式和组合模式

丁亚宁

极客大学架构师训练营 课程作业

架构师课程第三周总结

dongge

设计模式示例

imicode

一周信创舆情观察(6.15~6.21)

统小信uos

新基建 信创 matlab 舆情

架构师训练营第三周学习总结

Bruce Xiong

架构师之面向对象的设计模式

彭阿三

都在讲DevOps,但你知道它的发展趋势吗?

禅道项目管理

DevOps 自动化

架构师第三周作业

G小调

基于Jira的产品需求全生命周期管理实践

跟YY哥学Jira

项目管理 需求管理 Agile Jira Reports

架构师训练营第三周总结

王鑫龙

第三周作业

作业一

Thrine

第三周学习总结

G小调

架构师训练营-第三周-20200624-学习总结

丁亚宁

极客大学架构师训练营

Week3学习总结

熊威

第三周设计模式作业

我嗅到了数据开发工程师的危机

无箭的丘比特

大数据 数据仓库 数据分析 数据开发

组合模式

俊俊哥

设计模式 组合模式

插入排序

wjchenge

插入排序

组合模式实现树结构

新世界

深入理解JVM垃圾回收机制 - 引用类型

SkyeDance

深入理解JVM 强引用 软引用 弱引用 虚引用

架构师week3总结

平淡人生

第三周作业:设计模式

Larry

架构师训练营第三周 - 总结

Larry

架构师训练营——第三周学习总结

jiangnanage

测试RxJava_Java_Andres Almiray_InfoQ精选文章