NVIDIA 初创加速计划,免费加速您的创业启动 了解详情
写点什么

携程基于 Quasar 协程的 NIO 实践

  • 2020-09-03
  • 本文字数:6236 字

    阅读完需:约 20 分钟

携程基于Quasar协程的NIO实践

IO 密集型系统在高并发场景下,会有大量线程处于阻塞状态,性能低下,JAVA 上成熟的非阻塞 IO(NIO)技术可解决该问题。目前 Java 项目对接 NIO 的方式主要依靠回调,代码复杂度高,降低了代码可读性与可维护性。近年来 Golang、Kotlin 等语言的协程(Coroutine)能达到高性能与可读性的兼顾。


本文利用开源的 Quasar 框架提供的协程对系统进行 NIO 改造,解决以下两个问题:


1)提升单机任务的吞吐量,保证业务请求突增时系统的可伸缩性。


2)使用更轻量的协程同步等待 IO,替代处理 NIO 常用的异步回调。

一、Java 异步编程与非阻塞 IO

本文改造的系统处理来自前台的任务,通过 HTTP 请求对端服务,还通过 RPC 调用内部服务。当业务高峰时,系统会遇到瞬时并发任务量数十倍激增的情况,系统的线程数量急剧增加造成性能下降。为此,不得不扩容以保证业务高峰时期的性能。



基于 epoll 的 NIO 框架 Netty 在一些框架级别的应用中已经得到了广泛使用,但在快速迭代的业务系统中的应用依然有一定的局限性。NIO 消除了线程的同步阻塞,意味着只能异步处理 IO 的结果,这与业务开发者顺序化的思维模式有一定差异。当业务逻辑复杂以及出现多次远程调用的情况下,多级回调难以实现和维护。

1.1 Java 中的异步工具

Java 项目大多使用 JDK8,除线程外可以获得的异步的编程支持包括 CompletableFuture,以及开源的 RxJava、Vert.x 等反应式编程框架等。这些工具使用了基于响应式编程的链式调用逐级传递事件,未从根本解决回调问题。


如下为将一段简单的逻辑判断使用 CompletableFuture 进行异步改造后的对比。原始版本使用 getA 方法获得第一步的请求结果,根据其相应选择使用 getB1 还是 getB2 获取第二步的响应作为结果。


HttpResponse a = getA();
HttpResponse b ;if(a.getBody().equals("1")){ b=getB1();}else{ b=getB2();}
String ans=b.getBody();
复制代码


首先将三个获取响应的方法改为异步。此处假设 getB1 与 getB2 内部已经具有复杂逻辑,且不属于同一领域,不适合合并为一个方法。


private CompletableFuture<HttpResponse> getA();private CompletableFuture<HttpResponse> getB1();private CompletableFuture<HttpResponse> getB2();
复制代码


然后使用 CompletableFuture 的链式调用,将两个步骤组合起来:


String ans = getA()        .thenCompose(a -> {            if (a.getBody().equals("1")) {                return getB1();            } else {                return getB2();            }        }).get()        .getBody();
复制代码


使用 CompletableFuture 的链式回调后,代码变得不友好。RxJava 等框架同样具有这个问题。这类反应式的编程工具更适合于数据流的传递。对于 if/else、switch/case,乃至 while/for、break/continue 这类过程控制语句,实现与维护的难度都很大。业务系统需要类似于线程的同步等待,同时具有低资源消耗的编码工具,配合 NIO 使用。当时使用 NIO 时,由于可以不占用线程,可以使用一种资源消耗更小的协程来等待。

1.2 协程

协程是一种进程自身来调度任务的调度模式。协程与线程不同之处在于,线程由内核调度,而协程的调度是进程自身完成的。协程只是一种抽象,最终的执行者是线程,每个线程只能同时执行一个协程,但大量的协程可以只拥有少量几个线程执行者,协程的调度器负责决定当前线程在执行那个协程,其余协程处于休眠并被调度器保存在内存中。


和线程类似,协程挂起时需要记录栈信息,以及方法执行的位置,这些信息会被协程调度器保存。协程从挂起到重新被执行不需要执行重量级的内核调用,而是直接将状态信息还原到执行线程的栈,高并发场景下,协程极大地避免了切换线程的开销。下图展示了协程调度器内部任务的流转。



协程中调用的方法是可以挂起的。不同于线程的阻塞会使线程休眠,协程在等待异步任务的结果时,会通知调度器将自己放入挂起队列,释放占用的线程以处理其他的协程。异步任务完毕后,通过回调将异步结果告知协程,并通知调度器将协程重新加入就绪队列执行。

1.3 Quasar 任务调度原理

Quasar(https://github.com/puniverse/quasar)是一个开源的 Java 协程框架,通过利用 Java instrument 技术对字节码进行修改,使方法挂起前后可以保存和恢复 JVM 栈帧,方法内部已执行到的字节码位置也通过增加状态机的方式记录,在下次恢复执行可直接跳转至最新位置。以如下方法为例,该方法分为两步,第一步为 initial 初始化,第二部为通过 NIO 获取网络响应。


public String instrumentDemo(){    initial();    String ans = getFromNIO();    return ans;}
复制代码



Quasar 会在 initial 前增加一个 flag 字段,表明当前方法执行的位置。第一次执行方法时,检查到 flag 为 0,修改 flag 为 1 并继续往下执行 initial 方法。执行 getFromNIO 方法前插入字节码指令将栈帧中的数据全部保存在一个 Quasar 自定义的栈结构中,在执行 getFromNIO 后,挂起协程,让出线程资源。直至 NIO 异步完成后,协程调度器将第二次执行该方法,检测到 flag 为 1,将会调用 jump 指令跳转到 returnans 语句前,并将保存的栈结构还原到当前栈中,最后调用人 return ans 语句,方法执行完毕。

二、系统异步 IO 改造

在项目中添加 Quasar 依赖后,可以使用 Fiber 类新建协程。建立的方法与线程类似。


new Fiber(()->{    //方法体}).start();
复制代码

2.1 整合 Netty 与 Quasar

系统使用的 Http 框架是基于 Netty 的 async-http-client(https://github.com/AsyncHttpClient/async-http-client),该框架提供了异步回调和 CompletableFuture 两种对响应的异步处理方式。


CompletableFuture 自 JDK8 推出,与之前的 Future 类最大的不同在于,提供了异步任务跨线程的通知和控制机制。即,任务的等待者可以在 CompletableFuture 注册任务完成或异常时的回调,而执行者也可以通过它通知等待者。Quaasr 框架对它也做了支持,提供了 API 用于在协程中等待 CompletableFuture 的结果。调用后,协程将挂起,直至 future 状态为已完成。


AsyncCompletionStage.get(future)
复制代码


通过 CompletableFuture 作为通知中介,我们可以将 AsyncHttpClient 与 Quasar 做整合,挂起协程等待 IO 结果。


//创建HttpClientAsyncHttpClient httpClient = Dsl.asyncHttpClient();//创建请求Request request = createRequest();//将网络请求交给HttpClient执行CompletableFuture<Response> future = httpClient.executeRequest(request).toCompletableFuture();//通过Quasar挂起协程Response response = AsyncCompletionStage.get(future);//获取网络结果后,通过future传递response并唤醒协程重新执行deal(response);
复制代码


过程可由下图表示。



Quasar 框架 AsyncCompletionStage.get 内部完成的工作相当于,在 HttpClient 返回的 future 上注册回调,回调的内容是“IO 操作完成后通知调度器唤醒协程”,这样将 NIO 异步回调全部操作封装在协程调度器中,用户代码看起来是同步等待的形式,避免了自行实现回调处理带来的繁琐,解决了前文所述的回调地狱。

2.2 声明挂起方法

Quasar 需要织入字节码接管挂起方法的调度,在项目主 pom 下添加 quasar-maven-plugin 插件,该插件将在编译后的 class 文件中修改字节码。


<plugin>    <groupId>com.vlkan</groupId>    <artifactId>quasar-maven-plugin</artifactId>    <version>0.7.9</version>    <executions>        <execution>            <goals>                <goal>instrument</goal>            </goals>        </execution>    </executions></plugin>
复制代码


Quasar 通过识别方法是否抛出了该框架定义的 SuspendExecution 异常决定是否修改字节码。Quasar 框架在 AsyncCompletionStage.get 方法上声明了 SuspendExceution 异常,该异常是捕获异常,但仅作为识别挂起方法的声明,在运行时不会实际抛出。使用者必须逐层抛出该异常直至新建协程的一层。当方法内部存在 try/catch 语句时,也必须抛出该异常。


public void startFiber() throws ExecutionException, InterruptedException {    Fiber<Void> fiber = new Fiber<Void>(() -> {        //不用继续抛出异常        Response response = waitNextLayer1();        deal(response);    }).start();}
private Response waitNextLayer1() throws SuspendExecution { return waitNextLayer2();}
private Response waitNextLayer2() throws SuspendExecution { CompletableFuture<Response> future = httpClient.executeRequest(request).toCompletableFuture(); try { // Quasar框架工具类抛出SuspendExecution return AsyncCompletionStage.get(future); } catch (Exception e) { return null; }}
复制代码

2.3 异步 RPC 调用

目前主流的 RPC 框架都基于 NIO 实现,支持异步回调,有的 RPC 框架已经直接提供了返回 CompletableFuture 或 ListenableFuture(Guava 工具类提供)的异步接口,通过使用 ComplatableFuture,可以按前文类似的方法将 Quasar 与 RPC 框架结合起来。当 RPC 框架没有该返回类型时,一般会提供如下类似的带泛型的异步回调接口:


interface Callback<TResponse> {    void callback(TResponse TResponse, Exception e);}
复制代码


这种情况,可以使用者自己创建 ComplatableFuture,在回调中设置其状态,并调用 AsyncCompletionStage.get 等待这个 future。


CompletableFuture<Response> future=new CompletableFuture<>();//调用hello接口的异步APInew RpcClient().helloAsync(request, new Callback<Response>() {    public void callback(Response response, Exception e) {        if (e == null) future.complete(response);        else future.completeExceptionally(e);    }});//在此处调用Quasar的API,挂起直至RPC调用完成Response response = AsyncCompletionStage.get(future);
复制代码


上述代码依然具有异步回调不直观的缺点,通过 JDK8 的函数式接口可以实现一个通用的调用模板,将异步回调变为同步等待的形式。


@FunctionalInterfaceprivate interface RpcAsyncCall<TRequest, TResponse> {    void request(TRequest request, Callback<TResponse> callback);}public <TRequest, TResponse> TResponse waitRpc(RpcAsyncCall<TRequest, TResponse> call, TRequest request) throws SuspendExecution {    CompletableFuture<TResponse> future = new CompletableFuture<>();
call.request(request, (response, e) -> { if (e == null) future.complete(response); else future.completeExceptionally(e); });
try { //使用Quasar等待Future结果 return AsyncCompletionStage.get(future); } catch (Exception e) { return null; }}
复制代码


最后的调用可简化一行代码,该方法适用于所有该 Rpc 框架提供的异步接口。


Response response= waitRpc(new RpcClient()::helloAsync, request);
复制代码

2.4 阻塞操作的处理

Quasar 协程使用的时候有一定的限制,由于调度器线程池大小固定,在协程中不能阻塞线程,执行线程将被占用。对于某些暂时只能依靠阻塞 IO 的调用,如数据库,消息队列等,无法使用协程等待其结果,当这些阻塞操作量不大的情况下,可使用另一个可伸缩的线程池等待结果,避免对协程调度器的影响。


public void waitBlocking() throws SuspendExecution {    //从DB获取结果    String ans = waitBlocking(this::selectFromDB);}
private ExecutorService threadPool = Executors.newCachedThreadPool();
private <T> T waitBlocking(Supplier<T> supplier) throws SuspendExecution { CompletableFuture<T> future = new CompletableFuture<>(); threadPool.submit(() -> { T ans = supplier.get(); future.complete(ans); });
try { return AsyncCompletionStage.get(future); } catch (Exception e) { return null; }}
复制代码

2.5 并发工具的使用

协程对并发锁的使用有比较大的限制,需要使用者理解线程锁与协程的调度机制。在 synchronized 同步块的内部,不能包含挂起协程的语句。当持有锁的协程挂起后会让出线程资源,由于锁的可重入性,另一个运行在同一个线程上的协程再加锁时同样会成功。另一方面,协程挂起后恢复执行时,也可能会在另一个线程上运行。出现两个线程操作共享资源的异常。同时未持有锁的线程释放时,会出现 IllegalMonitorStateException 异常。



但如果同步块的内部没有挂起协程的语句,则线程锁的机制仍然有效。线程的在执行过程中可能切换,而协程的调度在每个执行线程上是串行的,协程持有的锁在不包含挂起操作时,会在占用线程执行完毕直到退出同步块为止,不会发生锁失效的情况。


JDK 并发包中的工具可分为两类,一类是 Lock、Semaphore、CountDownLatch 等具有线程可重入性的工具,不能在未释放资源前使用挂起协程的操作,而另一类则是原子变量、并发容器等不会让出线程的工具,仍可正常使用,但要注意高并发的情况下锁的性能。此外,在使用并发工具的阻塞方法,如 await 时,可能导致协程的执行线程中发生阻塞。

三、总结

系统运行在 4 核心的主机上,线程池构成如下。



业务逻辑运行在 Quasar 的协程调度线程池中,线程池大小为 CPU 核数。HTTP 请求与 RPC 调用均通过内部的 NIO 线程池管理。此外定义了一个 core size 为 8 的可伸缩的线程池用于少量消息队列、DB 等阻塞 IO 的操作。其余的线程是系统中引入的其他组件所新建的线程,正常情况下不会成为系统性能的瓶颈。


改造后,在业务高峰流量激增数十倍的情况下线程数量依然稳定,而 CPU 利用率也从平均 5%以下提升至 10%-60%,在瞬时与高峰流量下能保持稳定。集群 CPU 核数在保留一定的业务冗余以应对业务高峰的情况下,缩减至 1/5。

3.1 限制与风险

Quasar 协程不是 Java 的语言标准,没有 JVM 层面的支持,使用时必须手动抛出异常声明每一个挂起方法,对代码有一定的侵入性。使用不当时,可能出现异常。


代码的 try/catch 时可能同时捕获 SuspendExecution 异常,从而忘记标记方法,此方法字节码不会被修改,结合 Quasar 的原理不难看出,当没有织入字节码时,挂起方法恢复执行,无法还原方法栈帧和执行状态,将会出现语句被重复执行、空指针等错误。运行时空指针、死循环的症状,排查的重点是是否漏加 SuspendExecution 标记。


在新线程而不是新协程中使用挂起方法时,会出现同样的问题。Thread 的构造方法中传入的是 Runnable 接口对象,其 run 方法没有声明 SuspendExecution 异常,run 内部的语句不会被织入字节码,造成上述异常。

3.2 总结与展望

协程使得 NIO 能够更好地应用在 Java 中,比回调方法更易读易维护。对系统的改造集中在底层通信封装和对方法的标记上,业务逻辑无需修改。虽然具有一定的代码侵入性和理解成本,但这种学习成本能逐渐被代码的可维护性优势抵消。


异步编程最佳的实现方式是:“Codes Like Sync,Works Like Async”,即以同步的方式编码,达到异步的效果与性能,兼顾可维护性与可伸缩性。OpenJDK 在 2018 年创建了 Loom 项目(https://wiki.openjdk.java.net/display/loom),目标是在 JVM 上实现轻量级的线程,并解除 JVM 线程与内核线程的映射。相信会给 Java 生态带来巨大的改变。


作者介绍


Ryan,携程 Java 开发工程师,对高并发、网络编程等领域有浓厚兴趣。


本文转载自公众号携程技术(ID:ctriptech)。


原文链接


携程基于Quasar协程的NIO实践


2020-09-03 10:042738

评论 2 条评论

发布
用户头像
https://github.com/puniverse/quasar/issues/350

quasar代码的最后一次提交还是在2018年,项目看起来已经废弃了
2022-02-25 12:04
回复
用户头像
c# 的await模型不香吗
2020-09-04 09:00
回复
没有更多了
发现更多内容

七日算法先导(二)——双指针

工程师日月

8月月更

ABAP-OOAVL模板程序

桥下本有油菜花

abap

数据湖(一):数据湖概念

Lansonli

数据湖 8月月更

技术分享| 融合调度系统中的电子围栏功能说明

anyRTC开发者

音视频 快对讲 语音对讲 视频对讲 电子围栏

数据湖(三):Hudi概念术语

Lansonli

数据湖 8月月更

受邀出席Rust开发者大会|Rust如何助力量化高频交易?

非凸科技

量化策略 量化交易

开源一夏 |Spring boot 自动配置原理

叶秋学长

开源 8月月更

【云原生】jenkins部署docker镜像到远程服务器

青芒果

云原生 CODING Linxu docekr 8月月更

实践GoF的设计模式:迭代器模式

华为云开发者联盟

后端 开发

SpringBoot 发送邮件

开源 SpringBoot 2 8月月更

C语言结构体(入门)

孤衫

编程语言 C语言 结构体 8月月更

看我如何用多线程,帮助运营小姐姐解决数据校对系统变慢!

华为云开发者联盟

后端 开发

IIR数字滤波器设计(数字信号处理)

Five

matlab 过滤器 8月月更

如何在Linux中不解压就能查看压缩包中的内容,这13个命令非常强!

wljslmz

Linux 签约计划第三季 8月月更

Spring Controller

武师叔

8月月更

软件成分分析:手握5大能力守护软件供应链安全

华为云开发者联盟

云计算 安全 华为云

小程序插件的生态丰富,加速开发建设效率

Speedoooo

小程序 小程序容器 小程序插件

8大软件供应链攻击事件概述

SEAL安全

开源 DevOps DevSecOps 软件供应链安全 软件供应链攻击

中原银行实时风控体系建设实践

Apache Flink

大数据 flink 编程

前端面试高频20道手写题(二)

helloworld1024fd

Vite2 + Vue3 + TypeScript + Pinia 搭建一套企业级的开发脚手架

小周先生

开源 Vite2 vue3.0 #开源 8月月更

基于threejs的商品VR展示平台的设计与实现思路

Five

js vr three.js 签约计划第三季 8月月更

如何通过DBeaver 连接 TDengine?

TDengine

数据库 tdengine Dbeaver

基于深度学习的裂缝检测技术

阿炜小菜鸡

8月月更 裂缝监测

半夜赶工制作简报的我好想说 : 确定了,最终稿就是这样

叶小鍵

数据湖(二):什么是Hudi

Lansonli

数据湖 8月月更

节省50%成本!京东云重磅发布新一代混合CDN产品

京东科技开发者

云计算 CDN DDoS 混合云

React高频面试题合集(二)

helloworld1024fd

React

前端面试高频20道手写题

helloworld1024fd

主流跨端技术一览

Speedoooo

前端框架 小程序容器

分享一个Chrome控制台数据获取的例子

睡着了去做梦

Go Chrome开发者工具

携程基于Quasar协程的NIO实践_开源_Ryan_InfoQ精选文章