你在使用哪种编程语言?快来投票,亲手选出你心目中的编程语言之王 了解详情
写点什么

携程基于 Quasar 协程的 NIO 实践

2020 年 9 月 03 日

携程基于Quasar协程的NIO实践

IO 密集型系统在高并发场景下,会有大量线程处于阻塞状态,性能低下,JAVA 上成熟的非阻塞 IO(NIO)技术可解决该问题。目前 Java 项目对接 NIO 的方式主要依靠回调,代码复杂度高,降低了代码可读性与可维护性。近年来 Golang、Kotlin 等语言的协程(Coroutine)能达到高性能与可读性的兼顾。


本文利用开源的 Quasar 框架提供的协程对系统进行 NIO 改造,解决以下两个问题:


1)提升单机任务的吞吐量,保证业务请求突增时系统的可伸缩性。


2)使用更轻量的协程同步等待 IO,替代处理 NIO 常用的异步回调。


一、Java 异步编程与非阻塞 IO

本文改造的系统处理来自前台的任务,通过 HTTP 请求对端服务,还通过 RPC 调用内部服务。当业务高峰时,系统会遇到瞬时并发任务量数十倍激增的情况,系统的线程数量急剧增加造成性能下降。为此,不得不扩容以保证业务高峰时期的性能。



基于 epoll 的 NIO 框架 Netty 在一些框架级别的应用中已经得到了广泛使用,但在快速迭代的业务系统中的应用依然有一定的局限性。NIO 消除了线程的同步阻塞,意味着只能异步处理 IO 的结果,这与业务开发者顺序化的思维模式有一定差异。当业务逻辑复杂以及出现多次远程调用的情况下,多级回调难以实现和维护。


1.1 Java 中的异步工具

Java 项目大多使用 JDK8,除线程外可以获得的异步的编程支持包括 CompletableFuture,以及开源的 RxJava、Vert.x 等反应式编程框架等。这些工具使用了基于响应式编程的链式调用逐级传递事件,未从根本解决回调问题。


如下为将一段简单的逻辑判断使用 CompletableFuture 进行异步改造后的对比。原始版本使用 getA 方法获得第一步的请求结果,根据其相应选择使用 getB1 还是 getB2 获取第二步的响应作为结果。


HttpResponse a = getA();
HttpResponse b ;if(a.getBody().equals("1")){ b=getB1();}else{ b=getB2();}
String ans=b.getBody();
复制代码


首先将三个获取响应的方法改为异步。此处假设 getB1 与 getB2 内部已经具有复杂逻辑,且不属于同一领域,不适合合并为一个方法。


private CompletableFuture<HttpResponse> getA();private CompletableFuture<HttpResponse> getB1();private CompletableFuture<HttpResponse> getB2();
复制代码


然后使用 CompletableFuture 的链式调用,将两个步骤组合起来:


String ans = getA()        .thenCompose(a -> {            if (a.getBody().equals("1")) {                return getB1();            } else {                return getB2();            }        }).get()        .getBody();
复制代码


使用 CompletableFuture 的链式回调后,代码变得不友好。RxJava 等框架同样具有这个问题。这类反应式的编程工具更适合于数据流的传递。对于 if/else、switch/case,乃至 while/for、break/continue 这类过程控制语句,实现与维护的难度都很大。业务系统需要类似于线程的同步等待,同时具有低资源消耗的编码工具,配合 NIO 使用。当时使用 NIO 时,由于可以不占用线程,可以使用一种资源消耗更小的协程来等待。


1.2 协程

协程是一种进程自身来调度任务的调度模式。协程与线程不同之处在于,线程由内核调度,而协程的调度是进程自身完成的。协程只是一种抽象,最终的执行者是线程,每个线程只能同时执行一个协程,但大量的协程可以只拥有少量几个线程执行者,协程的调度器负责决定当前线程在执行那个协程,其余协程处于休眠并被调度器保存在内存中。


和线程类似,协程挂起时需要记录栈信息,以及方法执行的位置,这些信息会被协程调度器保存。协程从挂起到重新被执行不需要执行重量级的内核调用,而是直接将状态信息还原到执行线程的栈,高并发场景下,协程极大地避免了切换线程的开销。下图展示了协程调度器内部任务的流转。



协程中调用的方法是可以挂起的。不同于线程的阻塞会使线程休眠,协程在等待异步任务的结果时,会通知调度器将自己放入挂起队列,释放占用的线程以处理其他的协程。异步任务完毕后,通过回调将异步结果告知协程,并通知调度器将协程重新加入就绪队列执行。


1.3 Quasar 任务调度原理

Quasar(https://github.com/puniverse/quasar)是一个开源的 Java 协程框架,通过利用 Java instrument 技术对字节码进行修改,使方法挂起前后可以保存和恢复 JVM 栈帧,方法内部已执行到的字节码位置也通过增加状态机的方式记录,在下次恢复执行可直接跳转至最新位置。以如下方法为例,该方法分为两步,第一步为 initial 初始化,第二部为通过 NIO 获取网络响应。


public String instrumentDemo(){    initial();    String ans = getFromNIO();    return ans;}
复制代码



Quasar 会在 initial 前增加一个 flag 字段,表明当前方法执行的位置。第一次执行方法时,检查到 flag 为 0,修改 flag 为 1 并继续往下执行 initial 方法。执行 getFromNIO 方法前插入字节码指令将栈帧中的数据全部保存在一个 Quasar 自定义的栈结构中,在执行 getFromNIO 后,挂起协程,让出线程资源。直至 NIO 异步完成后,协程调度器将第二次执行该方法,检测到 flag 为 1,将会调用 jump 指令跳转到 returnans 语句前,并将保存的栈结构还原到当前栈中,最后调用人 return ans 语句,方法执行完毕。


二、系统异步 IO 改造

在项目中添加 Quasar 依赖后,可以使用 Fiber 类新建协程。建立的方法与线程类似。


new Fiber(()->{    //方法体}).start();
复制代码


2.1 整合 Netty 与 Quasar

系统使用的 Http 框架是基于 Netty 的 async-http-client(https://github.com/AsyncHttpClient/async-http-client),该框架提供了异步回调和 CompletableFuture 两种对响应的异步处理方式。


CompletableFuture 自 JDK8 推出,与之前的 Future 类最大的不同在于,提供了异步任务跨线程的通知和控制机制。即,任务的等待者可以在 CompletableFuture 注册任务完成或异常时的回调,而执行者也可以通过它通知等待者。Quaasr 框架对它也做了支持,提供了 API 用于在协程中等待 CompletableFuture 的结果。调用后,协程将挂起,直至 future 状态为已完成。


AsyncCompletionStage.get(future)
复制代码


通过 CompletableFuture 作为通知中介,我们可以将 AsyncHttpClient 与 Quasar 做整合,挂起协程等待 IO 结果。


//创建HttpClientAsyncHttpClient httpClient = Dsl.asyncHttpClient();//创建请求Request request = createRequest();//将网络请求交给HttpClient执行CompletableFuture<Response> future = httpClient.executeRequest(request).toCompletableFuture();//通过Quasar挂起协程Response response = AsyncCompletionStage.get(future);//获取网络结果后,通过future传递response并唤醒协程重新执行deal(response);
复制代码


过程可由下图表示。



Quasar 框架 AsyncCompletionStage.get 内部完成的工作相当于,在 HttpClient 返回的 future 上注册回调,回调的内容是“IO 操作完成后通知调度器唤醒协程”,这样将 NIO 异步回调全部操作封装在协程调度器中,用户代码看起来是同步等待的形式,避免了自行实现回调处理带来的繁琐,解决了前文所述的回调地狱。


2.2 声明挂起方法

Quasar 需要织入字节码接管挂起方法的调度,在项目主 pom 下添加 quasar-maven-plugin 插件,该插件将在编译后的 class 文件中修改字节码。


<plugin>    <groupId>com.vlkan</groupId>    <artifactId>quasar-maven-plugin</artifactId>    <version>0.7.9</version>    <executions>        <execution>            <goals>                <goal>instrument</goal>            </goals>        </execution>    </executions></plugin>
复制代码


Quasar 通过识别方法是否抛出了该框架定义的 SuspendExecution 异常决定是否修改字节码。Quasar 框架在 AsyncCompletionStage.get 方法上声明了 SuspendExceution 异常,该异常是捕获异常,但仅作为识别挂起方法的声明,在运行时不会实际抛出。使用者必须逐层抛出该异常直至新建协程的一层。当方法内部存在 try/catch 语句时,也必须抛出该异常。


public void startFiber() throws ExecutionException, InterruptedException {    Fiber<Void> fiber = new Fiber<Void>(() -> {        //不用继续抛出异常        Response response = waitNextLayer1();        deal(response);    }).start();}
private Response waitNextLayer1() throws SuspendExecution { return waitNextLayer2();}
private Response waitNextLayer2() throws SuspendExecution { CompletableFuture<Response> future = httpClient.executeRequest(request).toCompletableFuture(); try { // Quasar框架工具类抛出SuspendExecution return AsyncCompletionStage.get(future); } catch (Exception e) { return null; }}
复制代码


2.3 异步 RPC 调用

目前主流的 RPC 框架都基于 NIO 实现,支持异步回调,有的 RPC 框架已经直接提供了返回 CompletableFuture 或 ListenableFuture(Guava 工具类提供)的异步接口,通过使用 ComplatableFuture,可以按前文类似的方法将 Quasar 与 RPC 框架结合起来。当 RPC 框架没有该返回类型时,一般会提供如下类似的带泛型的异步回调接口:


interface Callback<TResponse> {    void callback(TResponse TResponse, Exception e);}
复制代码


这种情况,可以使用者自己创建 ComplatableFuture,在回调中设置其状态,并调用 AsyncCompletionStage.get 等待这个 future。


CompletableFuture<Response> future=new CompletableFuture<>();//调用hello接口的异步APInew RpcClient().helloAsync(request, new Callback<Response>() {    public void callback(Response response, Exception e) {        if (e == null) future.complete(response);        else future.completeExceptionally(e);    }});//在此处调用Quasar的API,挂起直至RPC调用完成Response response = AsyncCompletionStage.get(future);
复制代码


上述代码依然具有异步回调不直观的缺点,通过 JDK8 的函数式接口可以实现一个通用的调用模板,将异步回调变为同步等待的形式。


@FunctionalInterfaceprivate interface RpcAsyncCall<TRequest, TResponse> {    void request(TRequest request, Callback<TResponse> callback);}public <TRequest, TResponse> TResponse waitRpc(RpcAsyncCall<TRequest, TResponse> call, TRequest request) throws SuspendExecution {    CompletableFuture<TResponse> future = new CompletableFuture<>();
call.request(request, (response, e) -> { if (e == null) future.complete(response); else future.completeExceptionally(e); });
try { //使用Quasar等待Future结果 return AsyncCompletionStage.get(future); } catch (Exception e) { return null; }}
复制代码


最后的调用可简化一行代码,该方法适用于所有该 Rpc 框架提供的异步接口。


Response response= waitRpc(new RpcClient()::helloAsync, request);
复制代码


2.4 阻塞操作的处理

Quasar 协程使用的时候有一定的限制,由于调度器线程池大小固定,在协程中不能阻塞线程,执行线程将被占用。对于某些暂时只能依靠阻塞 IO 的调用,如数据库,消息队列等,无法使用协程等待其结果,当这些阻塞操作量不大的情况下,可使用另一个可伸缩的线程池等待结果,避免对协程调度器的影响。


public void waitBlocking() throws SuspendExecution {    //从DB获取结果    String ans = waitBlocking(this::selectFromDB);}
private ExecutorService threadPool = Executors.newCachedThreadPool();
private <T> T waitBlocking(Supplier<T> supplier) throws SuspendExecution { CompletableFuture<T> future = new CompletableFuture<>(); threadPool.submit(() -> { T ans = supplier.get(); future.complete(ans); });
try { return AsyncCompletionStage.get(future); } catch (Exception e) { return null; }}
复制代码


2.5 并发工具的使用

协程对并发锁的使用有比较大的限制,需要使用者理解线程锁与协程的调度机制。在 synchronized 同步块的内部,不能包含挂起协程的语句。当持有锁的协程挂起后会让出线程资源,由于锁的可重入性,另一个运行在同一个线程上的协程再加锁时同样会成功。另一方面,协程挂起后恢复执行时,也可能会在另一个线程上运行。出现两个线程操作共享资源的异常。同时未持有锁的线程释放时,会出现 IllegalMonitorStateException 异常。



但如果同步块的内部没有挂起协程的语句,则线程锁的机制仍然有效。线程的在执行过程中可能切换,而协程的调度在每个执行线程上是串行的,协程持有的锁在不包含挂起操作时,会在占用线程执行完毕直到退出同步块为止,不会发生锁失效的情况。


JDK 并发包中的工具可分为两类,一类是 Lock、Semaphore、CountDownLatch 等具有线程可重入性的工具,不能在未释放资源前使用挂起协程的操作,而另一类则是原子变量、并发容器等不会让出线程的工具,仍可正常使用,但要注意高并发的情况下锁的性能。此外,在使用并发工具的阻塞方法,如 await 时,可能导致协程的执行线程中发生阻塞。


三、总结

系统运行在 4 核心的主机上,线程池构成如下。



业务逻辑运行在 Quasar 的协程调度线程池中,线程池大小为 CPU 核数。HTTP 请求与 RPC 调用均通过内部的 NIO 线程池管理。此外定义了一个 core size 为 8 的可伸缩的线程池用于少量消息队列、DB 等阻塞 IO 的操作。其余的线程是系统中引入的其他组件所新建的线程,正常情况下不会成为系统性能的瓶颈。


改造后,在业务高峰流量激增数十倍的情况下线程数量依然稳定,而 CPU 利用率也从平均 5%以下提升至 10%-60%,在瞬时与高峰流量下能保持稳定。集群 CPU 核数在保留一定的业务冗余以应对业务高峰的情况下,缩减至 1/5。


3.1 限制与风险

Quasar 协程不是 Java 的语言标准,没有 JVM 层面的支持,使用时必须手动抛出异常声明每一个挂起方法,对代码有一定的侵入性。使用不当时,可能出现异常。


代码的 try/catch 时可能同时捕获 SuspendExecution 异常,从而忘记标记方法,此方法字节码不会被修改,结合 Quasar 的原理不难看出,当没有织入字节码时,挂起方法恢复执行,无法还原方法栈帧和执行状态,将会出现语句被重复执行、空指针等错误。运行时空指针、死循环的症状,排查的重点是是否漏加 SuspendExecution 标记。


在新线程而不是新协程中使用挂起方法时,会出现同样的问题。Thread 的构造方法中传入的是 Runnable 接口对象,其 run 方法没有声明 SuspendExecution 异常,run 内部的语句不会被织入字节码,造成上述异常。


3.2 总结与展望

协程使得 NIO 能够更好地应用在 Java 中,比回调方法更易读易维护。对系统的改造集中在底层通信封装和对方法的标记上,业务逻辑无需修改。虽然具有一定的代码侵入性和理解成本,但这种学习成本能逐渐被代码的可维护性优势抵消。


异步编程最佳的实现方式是:“Codes Like Sync,Works Like Async”,即以同步的方式编码,达到异步的效果与性能,兼顾可维护性与可伸缩性。OpenJDK 在 2018 年创建了 Loom 项目(https://wiki.openjdk.java.net/display/loom),目标是在 JVM 上实现轻量级的线程,并解除 JVM 线程与内核线程的映射。相信会给 Java 生态带来巨大的改变。


作者介绍


Ryan,携程 Java 开发工程师,对高并发、网络编程等领域有浓厚兴趣。


本文转载自公众号携程技术(ID:ctriptech)。


原文链接


携程基于Quasar协程的NIO实践


2020 年 9 月 03 日 10:041340

评论 1 条评论

发布
用户头像
c# 的await模型不香吗
2020 年 09 月 04 日 09:00
回复
没有更多了
发现更多内容

2021最新分享:阿里内部总监手码的“Redis学习手册”风靡全网

比伯

Java 编程 程序员 架构 面试

WebRTC 音视频同步原理与实现

阿里云视频云

阿里云 音视频 WebRTC 流媒体

半个多月时间4面阿里,已经成功拿下offer,分享一下个人面经

Java架构之路

Java 程序员 架构 面试 编程语言

十四五,鹏城应作先锋看,山河同襄智能体

脑极体

one day

旭陽

OCE等你加入

滴滴云

云计算 私有云 滴滴夜莺 Obsuite

全靠这份阿里大佬的“Java进阶面试手册”收获蚂蚁offer

周老师

Java 编程 程序员 架构 面试

SRS流媒体服务器源码分析--RTMP消息play

赖猫

音视频 流媒体 SRS 流媒体开发

Apache Ranger的部署安装

大数据技术指南

大数据 3月日更

不吹不黑聊中台

小谢同学

云计算 中台 企业架构

Kafka 架构中 ZooKeeper 以怎样的形式存在?

码农架构

Java 消息中间件

接口测试--apipost接口断言详解

测试人生路

接口

芯翌科技领跑NIST-FRVT戴口罩人脸识别评测,助力后疫情时代科技创新

朋湖网

LDAP身份认证管理最佳实践

龙归科技

服务器 ldap 客户端

拼多多五面面经(Java岗),全面涵盖Java基础到高并发级别

Java架构之路

Java 程序员 架构 面试 编程语言

音视频之opengl渲染图片

赖猫

音视频

翻译:《实用的Python编程》04_02_Inheritance

codists

Python 继承 inheritance

容器 & 服务:K8s 与 Docker 应用集群 (四)

程序员架构进阶

Docker Kubernetes 容器技术 28天写作 3月日更

职场求生攻略答疑篇之 5 —— 我,程序员,非常焦虑

臧萌

职场 成长

Linux内核 设备树操作常用API

赖猫

Linux Linux内核

Linux/Centos Epoll 原理解析

赖猫

Linux 高并发 epoll

Alluxio 助力 Kubernetes,加速云端深度学习

阿里巴巴云原生

人工智能 大数据 容器 云原生 k8s

收藏!这些 IDE 使用技巧,你都知道吗

阿里巴巴云原生

Java ide 云原生 API 调度

Go语言学习笔记:抓取XKCD中文站的漫画

worry

go

2021金三银四必备:“基础-中级-高级”Java程序员面试复习路线

比伯

Java 编程 程序员 架构 面试

JAVA已经呈饱和趋势了吗?

cdhqyj

Java 程序员 工作 IT

分布式事务与解决方案

一个大红包

28天挑战 3月日更

深入理解Linux内核 RCU 机制

赖猫

Linux linux编程 Linux内核

新人报道

shun123456789

女神营业!云通信产品运营带你玩转号码隐私保护:网约车、外卖等O2O行业的最佳实践

阿里云Edge Plus

云通信 通信云

话说cas

木子的昼夜

云原生场景下企业API 网关选型及落地实践

云原生场景下企业API 网关选型及落地实践

携程基于Quasar协程的NIO实践-InfoQ