【ArchSummit架构师峰会】探讨数据与人工智能相互驱动的关系>>> 了解详情
写点什么

携程基于 Quasar 协程的 NIO 实践

  • 2020-09-03
  • 本文字数:6236 字

    阅读完需:约 20 分钟

携程基于Quasar协程的NIO实践

IO 密集型系统在高并发场景下,会有大量线程处于阻塞状态,性能低下,JAVA 上成熟的非阻塞 IO(NIO)技术可解决该问题。目前 Java 项目对接 NIO 的方式主要依靠回调,代码复杂度高,降低了代码可读性与可维护性。近年来 Golang、Kotlin 等语言的协程(Coroutine)能达到高性能与可读性的兼顾。


本文利用开源的 Quasar 框架提供的协程对系统进行 NIO 改造,解决以下两个问题:


1)提升单机任务的吞吐量,保证业务请求突增时系统的可伸缩性。


2)使用更轻量的协程同步等待 IO,替代处理 NIO 常用的异步回调。

一、Java 异步编程与非阻塞 IO

本文改造的系统处理来自前台的任务,通过 HTTP 请求对端服务,还通过 RPC 调用内部服务。当业务高峰时,系统会遇到瞬时并发任务量数十倍激增的情况,系统的线程数量急剧增加造成性能下降。为此,不得不扩容以保证业务高峰时期的性能。



基于 epoll 的 NIO 框架 Netty 在一些框架级别的应用中已经得到了广泛使用,但在快速迭代的业务系统中的应用依然有一定的局限性。NIO 消除了线程的同步阻塞,意味着只能异步处理 IO 的结果,这与业务开发者顺序化的思维模式有一定差异。当业务逻辑复杂以及出现多次远程调用的情况下,多级回调难以实现和维护。

1.1 Java 中的异步工具

Java 项目大多使用 JDK8,除线程外可以获得的异步的编程支持包括 CompletableFuture,以及开源的 RxJava、Vert.x 等反应式编程框架等。这些工具使用了基于响应式编程的链式调用逐级传递事件,未从根本解决回调问题。


如下为将一段简单的逻辑判断使用 CompletableFuture 进行异步改造后的对比。原始版本使用 getA 方法获得第一步的请求结果,根据其相应选择使用 getB1 还是 getB2 获取第二步的响应作为结果。


HttpResponse a = getA();
HttpResponse b ;if(a.getBody().equals("1")){ b=getB1();}else{ b=getB2();}
String ans=b.getBody();
复制代码


首先将三个获取响应的方法改为异步。此处假设 getB1 与 getB2 内部已经具有复杂逻辑,且不属于同一领域,不适合合并为一个方法。


private CompletableFuture<HttpResponse> getA();private CompletableFuture<HttpResponse> getB1();private CompletableFuture<HttpResponse> getB2();
复制代码


然后使用 CompletableFuture 的链式调用,将两个步骤组合起来:


String ans = getA()        .thenCompose(a -> {            if (a.getBody().equals("1")) {                return getB1();            } else {                return getB2();            }        }).get()        .getBody();
复制代码


使用 CompletableFuture 的链式回调后,代码变得不友好。RxJava 等框架同样具有这个问题。这类反应式的编程工具更适合于数据流的传递。对于 if/else、switch/case,乃至 while/for、break/continue 这类过程控制语句,实现与维护的难度都很大。业务系统需要类似于线程的同步等待,同时具有低资源消耗的编码工具,配合 NIO 使用。当时使用 NIO 时,由于可以不占用线程,可以使用一种资源消耗更小的协程来等待。

1.2 协程

协程是一种进程自身来调度任务的调度模式。协程与线程不同之处在于,线程由内核调度,而协程的调度是进程自身完成的。协程只是一种抽象,最终的执行者是线程,每个线程只能同时执行一个协程,但大量的协程可以只拥有少量几个线程执行者,协程的调度器负责决定当前线程在执行那个协程,其余协程处于休眠并被调度器保存在内存中。


和线程类似,协程挂起时需要记录栈信息,以及方法执行的位置,这些信息会被协程调度器保存。协程从挂起到重新被执行不需要执行重量级的内核调用,而是直接将状态信息还原到执行线程的栈,高并发场景下,协程极大地避免了切换线程的开销。下图展示了协程调度器内部任务的流转。



协程中调用的方法是可以挂起的。不同于线程的阻塞会使线程休眠,协程在等待异步任务的结果时,会通知调度器将自己放入挂起队列,释放占用的线程以处理其他的协程。异步任务完毕后,通过回调将异步结果告知协程,并通知调度器将协程重新加入就绪队列执行。

1.3 Quasar 任务调度原理

Quasar(https://github.com/puniverse/quasar)是一个开源的 Java 协程框架,通过利用 Java instrument 技术对字节码进行修改,使方法挂起前后可以保存和恢复 JVM 栈帧,方法内部已执行到的字节码位置也通过增加状态机的方式记录,在下次恢复执行可直接跳转至最新位置。以如下方法为例,该方法分为两步,第一步为 initial 初始化,第二部为通过 NIO 获取网络响应。


public String instrumentDemo(){    initial();    String ans = getFromNIO();    return ans;}
复制代码



Quasar 会在 initial 前增加一个 flag 字段,表明当前方法执行的位置。第一次执行方法时,检查到 flag 为 0,修改 flag 为 1 并继续往下执行 initial 方法。执行 getFromNIO 方法前插入字节码指令将栈帧中的数据全部保存在一个 Quasar 自定义的栈结构中,在执行 getFromNIO 后,挂起协程,让出线程资源。直至 NIO 异步完成后,协程调度器将第二次执行该方法,检测到 flag 为 1,将会调用 jump 指令跳转到 returnans 语句前,并将保存的栈结构还原到当前栈中,最后调用人 return ans 语句,方法执行完毕。

二、系统异步 IO 改造

在项目中添加 Quasar 依赖后,可以使用 Fiber 类新建协程。建立的方法与线程类似。


new Fiber(()->{    //方法体}).start();
复制代码

2.1 整合 Netty 与 Quasar

系统使用的 Http 框架是基于 Netty 的 async-http-client(https://github.com/AsyncHttpClient/async-http-client),该框架提供了异步回调和 CompletableFuture 两种对响应的异步处理方式。


CompletableFuture 自 JDK8 推出,与之前的 Future 类最大的不同在于,提供了异步任务跨线程的通知和控制机制。即,任务的等待者可以在 CompletableFuture 注册任务完成或异常时的回调,而执行者也可以通过它通知等待者。Quaasr 框架对它也做了支持,提供了 API 用于在协程中等待 CompletableFuture 的结果。调用后,协程将挂起,直至 future 状态为已完成。


AsyncCompletionStage.get(future)
复制代码


通过 CompletableFuture 作为通知中介,我们可以将 AsyncHttpClient 与 Quasar 做整合,挂起协程等待 IO 结果。


//创建HttpClientAsyncHttpClient httpClient = Dsl.asyncHttpClient();//创建请求Request request = createRequest();//将网络请求交给HttpClient执行CompletableFuture<Response> future = httpClient.executeRequest(request).toCompletableFuture();//通过Quasar挂起协程Response response = AsyncCompletionStage.get(future);//获取网络结果后,通过future传递response并唤醒协程重新执行deal(response);
复制代码


过程可由下图表示。



Quasar 框架 AsyncCompletionStage.get 内部完成的工作相当于,在 HttpClient 返回的 future 上注册回调,回调的内容是“IO 操作完成后通知调度器唤醒协程”,这样将 NIO 异步回调全部操作封装在协程调度器中,用户代码看起来是同步等待的形式,避免了自行实现回调处理带来的繁琐,解决了前文所述的回调地狱。

2.2 声明挂起方法

Quasar 需要织入字节码接管挂起方法的调度,在项目主 pom 下添加 quasar-maven-plugin 插件,该插件将在编译后的 class 文件中修改字节码。


<plugin>    <groupId>com.vlkan</groupId>    <artifactId>quasar-maven-plugin</artifactId>    <version>0.7.9</version>    <executions>        <execution>            <goals>                <goal>instrument</goal>            </goals>        </execution>    </executions></plugin>
复制代码


Quasar 通过识别方法是否抛出了该框架定义的 SuspendExecution 异常决定是否修改字节码。Quasar 框架在 AsyncCompletionStage.get 方法上声明了 SuspendExceution 异常,该异常是捕获异常,但仅作为识别挂起方法的声明,在运行时不会实际抛出。使用者必须逐层抛出该异常直至新建协程的一层。当方法内部存在 try/catch 语句时,也必须抛出该异常。


public void startFiber() throws ExecutionException, InterruptedException {    Fiber<Void> fiber = new Fiber<Void>(() -> {        //不用继续抛出异常        Response response = waitNextLayer1();        deal(response);    }).start();}
private Response waitNextLayer1() throws SuspendExecution { return waitNextLayer2();}
private Response waitNextLayer2() throws SuspendExecution { CompletableFuture<Response> future = httpClient.executeRequest(request).toCompletableFuture(); try { // Quasar框架工具类抛出SuspendExecution return AsyncCompletionStage.get(future); } catch (Exception e) { return null; }}
复制代码

2.3 异步 RPC 调用

目前主流的 RPC 框架都基于 NIO 实现,支持异步回调,有的 RPC 框架已经直接提供了返回 CompletableFuture 或 ListenableFuture(Guava 工具类提供)的异步接口,通过使用 ComplatableFuture,可以按前文类似的方法将 Quasar 与 RPC 框架结合起来。当 RPC 框架没有该返回类型时,一般会提供如下类似的带泛型的异步回调接口:


interface Callback<TResponse> {    void callback(TResponse TResponse, Exception e);}
复制代码


这种情况,可以使用者自己创建 ComplatableFuture,在回调中设置其状态,并调用 AsyncCompletionStage.get 等待这个 future。


CompletableFuture<Response> future=new CompletableFuture<>();//调用hello接口的异步APInew RpcClient().helloAsync(request, new Callback<Response>() {    public void callback(Response response, Exception e) {        if (e == null) future.complete(response);        else future.completeExceptionally(e);    }});//在此处调用Quasar的API,挂起直至RPC调用完成Response response = AsyncCompletionStage.get(future);
复制代码


上述代码依然具有异步回调不直观的缺点,通过 JDK8 的函数式接口可以实现一个通用的调用模板,将异步回调变为同步等待的形式。


@FunctionalInterfaceprivate interface RpcAsyncCall<TRequest, TResponse> {    void request(TRequest request, Callback<TResponse> callback);}public <TRequest, TResponse> TResponse waitRpc(RpcAsyncCall<TRequest, TResponse> call, TRequest request) throws SuspendExecution {    CompletableFuture<TResponse> future = new CompletableFuture<>();
call.request(request, (response, e) -> { if (e == null) future.complete(response); else future.completeExceptionally(e); });
try { //使用Quasar等待Future结果 return AsyncCompletionStage.get(future); } catch (Exception e) { return null; }}
复制代码


最后的调用可简化一行代码,该方法适用于所有该 Rpc 框架提供的异步接口。


Response response= waitRpc(new RpcClient()::helloAsync, request);
复制代码

2.4 阻塞操作的处理

Quasar 协程使用的时候有一定的限制,由于调度器线程池大小固定,在协程中不能阻塞线程,执行线程将被占用。对于某些暂时只能依靠阻塞 IO 的调用,如数据库,消息队列等,无法使用协程等待其结果,当这些阻塞操作量不大的情况下,可使用另一个可伸缩的线程池等待结果,避免对协程调度器的影响。


public void waitBlocking() throws SuspendExecution {    //从DB获取结果    String ans = waitBlocking(this::selectFromDB);}
private ExecutorService threadPool = Executors.newCachedThreadPool();
private <T> T waitBlocking(Supplier<T> supplier) throws SuspendExecution { CompletableFuture<T> future = new CompletableFuture<>(); threadPool.submit(() -> { T ans = supplier.get(); future.complete(ans); });
try { return AsyncCompletionStage.get(future); } catch (Exception e) { return null; }}
复制代码

2.5 并发工具的使用

协程对并发锁的使用有比较大的限制,需要使用者理解线程锁与协程的调度机制。在 synchronized 同步块的内部,不能包含挂起协程的语句。当持有锁的协程挂起后会让出线程资源,由于锁的可重入性,另一个运行在同一个线程上的协程再加锁时同样会成功。另一方面,协程挂起后恢复执行时,也可能会在另一个线程上运行。出现两个线程操作共享资源的异常。同时未持有锁的线程释放时,会出现 IllegalMonitorStateException 异常。



但如果同步块的内部没有挂起协程的语句,则线程锁的机制仍然有效。线程的在执行过程中可能切换,而协程的调度在每个执行线程上是串行的,协程持有的锁在不包含挂起操作时,会在占用线程执行完毕直到退出同步块为止,不会发生锁失效的情况。


JDK 并发包中的工具可分为两类,一类是 Lock、Semaphore、CountDownLatch 等具有线程可重入性的工具,不能在未释放资源前使用挂起协程的操作,而另一类则是原子变量、并发容器等不会让出线程的工具,仍可正常使用,但要注意高并发的情况下锁的性能。此外,在使用并发工具的阻塞方法,如 await 时,可能导致协程的执行线程中发生阻塞。

三、总结

系统运行在 4 核心的主机上,线程池构成如下。



业务逻辑运行在 Quasar 的协程调度线程池中,线程池大小为 CPU 核数。HTTP 请求与 RPC 调用均通过内部的 NIO 线程池管理。此外定义了一个 core size 为 8 的可伸缩的线程池用于少量消息队列、DB 等阻塞 IO 的操作。其余的线程是系统中引入的其他组件所新建的线程,正常情况下不会成为系统性能的瓶颈。


改造后,在业务高峰流量激增数十倍的情况下线程数量依然稳定,而 CPU 利用率也从平均 5%以下提升至 10%-60%,在瞬时与高峰流量下能保持稳定。集群 CPU 核数在保留一定的业务冗余以应对业务高峰的情况下,缩减至 1/5。

3.1 限制与风险

Quasar 协程不是 Java 的语言标准,没有 JVM 层面的支持,使用时必须手动抛出异常声明每一个挂起方法,对代码有一定的侵入性。使用不当时,可能出现异常。


代码的 try/catch 时可能同时捕获 SuspendExecution 异常,从而忘记标记方法,此方法字节码不会被修改,结合 Quasar 的原理不难看出,当没有织入字节码时,挂起方法恢复执行,无法还原方法栈帧和执行状态,将会出现语句被重复执行、空指针等错误。运行时空指针、死循环的症状,排查的重点是是否漏加 SuspendExecution 标记。


在新线程而不是新协程中使用挂起方法时,会出现同样的问题。Thread 的构造方法中传入的是 Runnable 接口对象,其 run 方法没有声明 SuspendExecution 异常,run 内部的语句不会被织入字节码,造成上述异常。

3.2 总结与展望

协程使得 NIO 能够更好地应用在 Java 中,比回调方法更易读易维护。对系统的改造集中在底层通信封装和对方法的标记上,业务逻辑无需修改。虽然具有一定的代码侵入性和理解成本,但这种学习成本能逐渐被代码的可维护性优势抵消。


异步编程最佳的实现方式是:“Codes Like Sync,Works Like Async”,即以同步的方式编码,达到异步的效果与性能,兼顾可维护性与可伸缩性。OpenJDK 在 2018 年创建了 Loom 项目(https://wiki.openjdk.java.net/display/loom),目标是在 JVM 上实现轻量级的线程,并解除 JVM 线程与内核线程的映射。相信会给 Java 生态带来巨大的改变。


作者介绍


Ryan,携程 Java 开发工程师,对高并发、网络编程等领域有浓厚兴趣。


本文转载自公众号携程技术(ID:ctriptech)。


原文链接


携程基于Quasar协程的NIO实践


2020-09-03 10:042728

评论 2 条评论

发布
用户头像
https://github.com/puniverse/quasar/issues/350

quasar代码的最后一次提交还是在2018年,项目看起来已经废弃了
2022-02-25 12:04
回复
用户头像
c# 的await模型不香吗
2020-09-04 09:00
回复
没有更多了
发现更多内容

设计模式和七大设计原则不难的

知识浅谈

设计模式 设计原则 9月月更

成为优秀程序员的8种方法

小小怪下士

Java 程序员 职业发展

云图说丨DDoS防护解决方案:DDoS大流量攻击防得住

华为云开发者联盟

云计算 后端 华为云 企业号九月金秋榜

一个代码仓库(免费)与技术点 的故事

八点半的Bruce.D

GitHub Linux 网络服务 GitHub仓库

35岁程序员自荐:我所掌握的架构技术

小小怪下士

Java 程序员 中年危机

腾讯云,DevOps 领导者!

CODING DevOps

腾讯云 DevOps IDC CODING

打破联接壁垒,华为云IoT到底强在哪?

华为云开发者联盟

云计算 后端 物联网 华为云 企业号九月金秋榜

爆肝整理5000字!HTAP的关键技术有哪些?| StoneDB学术分享会#3

StoneDB

数据库 HTAP StoneDB 企业号九月金秋榜 9月月更

PipyJS - 函数式网络编程语言

Flomesh

Service Mesh 服务网格

数字化办公,企业OA软件技术该如何发力?

FinClip

【存疑】爬虫学习中decode问题

Sher10ck

存疑

EasyCV带你复现更好更快的自监督算法-FastConvMAE

阿里云大数据AI技术

深度学习 算法 计算机视觉

ESP32-C3入门教程 基础篇(八、NVS — 非易失性存储库的使用)

矜辰所致

ESP32-C3 9月月更 NVS

元宇宙场景技术实践|虚拟直播间搭建教程

ZEGO即构

音视频开发 元宇宙 虚拟直播

netty原理分析

小小怪下士

Java 编程 程序员 后端 Netty

2.69分钟完成BERT训练!新发CANN 5.0加持

华为云开发者联盟

人工智能 企业号九月金秋榜

2022最新腾讯面经分享:Java 面试刷题 PDF(17 大专题 )

Java-fenn

Java 编程 程序员 面试 java面试

2022年面试复盘大全500道:Redis+ZK+Nginx+数据库+分布式+微服务

小小怪下士

数据库 redis 分布式 微服务 java面试

Qt|制作简单的不规则窗体

中国好公民st

qt 事件 9月月更

英特尔Wi-Fi 7速率提升5倍,为多应用场景带来改变

科技之家

开发者问第四期|统一扫码服务、机器学习服务等问题解答

HMS Core

Java 面试之技术框架

小小怪下士

Java spring 编程 程序员

Lua脚本在Redis事务中的应用实践

京东科技开发者

数据库 redis 事务 开发语言 Lua脚本

中国DevOps平台市场,华为云再次位居领导者位置

华为云开发者联盟

云计算 华为云 企业号九月金秋榜

这样Debug,排查问题效率大大提升...

程序知音

长安链ca 容器部署(解决无法访问Mysql问题)

长安链

架构师成长之路——什么是架构师

小小怪下士

Java 程序员 架构 后端

老生常谈!数据库如何存储时间?你真的知道吗?

小小怪下士

Java 数据库 编程 程序员

为什么Java中有三种基础的类加载器?

小小怪下士

Java 编程 程序员 程序

前端面试哪些是必须要掌握的

loveX001

JavaScript 前端

基于云原生技术打造全球融合通信网关

阿里云视频云

云原生 网络 通信 通信云

携程基于Quasar协程的NIO实践_开源_Ryan_InfoQ精选文章