写点什么

戏(细)说 Executor 框架线程池任务执行全过程(下)

  • 2015-06-04
  • 本文字数:5448 字

    阅读完需:约 18 分钟

上一篇文章中通过引入的一个例子介绍了在Executor 框架下,提交一个任务的过程,这个过程就像我们老大的老大要找个老大来执行一个任务那样简单。并通过剖析ExecutorService 的一种经典实现ThreadPoolExecutor 来分析接收任务的主要逻辑,发现ThreadPoolExecutor 的工作思路和我们带项目的老大的工作思路完全一致。在本文中我们将继续后面的步骤,着重描述下任务执行的过程和任务执行结果获取的过程。会很容易发现,这个过程我们更加熟悉,因为正是每天我们工作的过程。除了ThreadPoolExecutor 的内部类Worker 外,对执行内容和执行结果封装的FutureTask 的表现是这部分着重需要了解的。

为了连贯期间,内容的编号延续上篇。

2. 任务执行

其实应该说是任务被执行,任务是宾语。动宾结构:execute the task,执行任务,无论写成英文还是中文似乎都是这样。那么主语是是 who 呢?明显不是调用 submit 的那位(线程),那是哪位呢?上篇介绍 ThreadPoolExecutor 主要属性时提到其中有一个 HashSet workers 的集合,我们有说明这里存储的就是线程池的工作队列的集合,队列的对象是 Worker 类型的工作线程,是 ThreadPoolExecutor 的一个内部类,实现了 Runnable 接口:

复制代码
private final class Worker implements Runnable

8) 看作业线程干什么当然是看它的 run 方法在干什么。如我们所料,作业线程就是在一直调用 getTask 方法获取任务,然后调用 runTask(task) 方法执行任务。看到没有,是在 while 循环里面,就是不干完不罢休的意思!在加班干活的苦逼的朋友们,有没有遇见战友的亲切感觉?

复制代码
public void run() {
try {
Runnable task = firstTask;
// 循环从线程池的任务队列获取任务
while (task != null || (task = getTask()) != null) {
// 执行任务
runTask(task);
task = null;
}
} finally {
workerDone(this);
}
}

然后简单看下 getTask 和 runTask(task) 方法的内容。

9) getTask 方法是 ThreadPoolExecutor 提供给其内部类 Worker 的的方法。作用就是一个,从任务队列中取任务,源源不断地输出任务。有没有想到老大手里拿的总是满满当当的 project,也是源源不断的。

复制代码
Runnable getTask() {
for (;;) {
// 从任务队列的头部取任务
r = workQueue.take();
return r;
}
}

10) runTask(Runnable task) 是工作线程 Worker 真正处理拿到的每个具体任务。看到这里才可用确认我们的猜想,之前提到 [y1] 的“执行任务”这个动宾结构前面的主语正是这些 Worker 呀。唠叨了半天(看主要方法都看到了整整第 10 个了),前面都是派活,这里才是干活。和我们的工作何其相似!老大(LD),老大的老大(LD^2),老大的老大(LD^n) 非常辛苦,花了很多时间、精力在会议室、在 project 上想着怎么生成和安排任务,然而真的轮到咱哥们干活,可能花了不少时间,但看看流程就是这么简单。三个大字:“Just do it”。

复制代码
private void runTask(Runnable task) {
// 调用任务的 run 方法,即在 Worker 线程中执行 Task 内定义内容。
task.run();
}

需要注意的地方出现了,调用的其实是 task 的 run 方法。看下 FutureTask 的 run 方法做了什么事情。

这里插入一个 FutureTask 的类图。可以看到 FutureTask 实现了 RunnableFuture 接口,所以 FutureTask 即有 Runnable 接口的 run 方法来定义任务内容,也有 Future 接口中定义的 get、cancel 等方法来控制任务执行和获取执行结果。Runnable 接口自不用说,Future 接口的伟大设计,就是使得实现该接口的对象可以阻塞线程直到任务执行完毕,也可以取消任务执行,检测任务是执行完毕还是被取消了。想想在之前我们使用 Thread.join() 或者 Thread.join(long millis) 等待任务结束是多么苦涩啊。

FutureTask 内部定义了一个 Sync 的内部类,继承自 AQS,来维护任务状态。关于 AQS 的设计思路,可以参照参考 Doug Lea 大师的原著 The java.util.concurrent Synchronizer Framework

(点击放大图像)

11) 和其他的同步工具类一样,FutureTask 的主要工作内容也是委托给其定义的内部类 Sync 来完成。

复制代码
public void run() {
// 调用 Sync 的对应方法
sync.innerRun();
}

12) FutureTask.Sync.innerRun(),这样做的目的就是为了维护任务执行的状态,只有当执行完后才能够获得任务执行结果。在该方法中,首先设置执行状态为 RUNNING 只有判断任务的状态是运行状态,才调用任务内封装的回调,并且在执行完成后设置回调的返回值到 FutureTask 的 result 变量上。在 FutureTask 中,innerRun 等每个“写”方法都会首先修改状态位,在后续会看到 innerGet 等“读”方法会先判断状态,然后才能决定后续的操作是否可以继续。下图是 FutureTask.Sync 中几个重要状态的流转情况,和其他的同步工具类一样,状态位使用的也是父类 AQS 的 state 属性。

(点击放大图像)

复制代码
void innerRun() {
// 通过对 AQS 的状态位 state 的判断来判断任务的状态是运行状态,则调用任务内封装的回调,并且设置回调的返回值
if (getState() == RUNNING)
innerSet(callable.call());
}
void innerSet(V v) {
for (;;) {
int s = getState();
// 设置运行状态为完成,并且把回调额执行结果设置给 result 变量
if (compareAndSetState(s, RAN)) {
result = v;
releaseShared(0);
done();
return;
}
}

至此工作线程执行 Task 就结束了。提交的任务是由 Worker 工作线程执行,正是在该线程上调用 Task 中定义的任务内容,即封装的 Callable 回调,并设置执行结果。下面就是最重要的部分:调用者如何获取执行的结果。让你加班那么久,总得把成果交出来吧。老大在等,因为老大的老大在等!

3. 获取执行结果

前面说过,对于老大的老大这样的使用者来说,获取执行结果这个过程总是最容易的事情,只需调用 FutureTask 的 get() 方法即可。该方法是在 Future 接口中就定义的。get 方法的作用就是等待执行结果。(Waits if necessary for the computation to complete, and then retrieves its result.)Future 这个接口命名得真好,虽然是在未来,但是定义有一个 get() 方法,总是“可以掌控的未来,总是有收获的未来!”实现该接口的 FutureTask 也应该是这个意思,在未来要完成的任务,但是一样要有结果哦。

13) FutureTask 的 get 方法同样委托给 Sync 来执行。和该方法类似,还有一个 V get(long timeout, TimeUnit unit),可以配置超时时间。

复制代码
public V get() throws InterruptedException, ExecutionException {
return sync.innerGet();
}

14) 在 Sync 的 innerGet 方法中,调用 AQS 父类定义的获取共享锁的方法 acquireSharedInterruptibly 来等待执行完成。如果执行完成了则可以继续执行后面的代码,返回 result 结果,否则如果还未完成,则阻塞线程等待执行完成。 [bd2] 再大的老大要想获得结果也得等老子干完了才行!可以看到调用 FutureTask 的 get 方法,进而调用到该方法的一定是想要执行结果的线程,一般应该就是提交 Task 的线程,而这个任务的执行是在 Worker 的工作线程上,通过 AQS 来保证执行完毕才能获取执行结果。该方法中 acquireSharedInterruptibly 是 AQS 父类中定义的获取共享锁的方法,但是到底满足什么条件可以成功获取共享锁,这是 Sync 的 tryAcquireShared 方法内定义的。 [bd3] 具体说来,innerIsDone 用来判断是否执行完毕,如果执行完毕则向下执行,返回 result 即可;如果判断未完成,则调用 AQS 的 doAcquireSharedInterruptibly 来挂起当前线程,一直到满足条件。这种思路在其他的几种同步工具类 Semaphore CountDownLatch ReentrantLock ReentrantReadWriteLock 也广泛使用。借助 AQS 框架,在获取锁时,先判断当前状态是否允许获取锁,若是允许则获取锁,否则获取不成功。获取不成功则会阻塞,进入阻塞队列。而释放锁时,一般会修改状态位,唤醒队列中的阻塞线程。每个同步工具类的自定义同步器都继承自 AQS 父类,是否可以获取锁根据同步类自身的功能要求覆盖 AQS 对应的 try 前缀方法,这些方法在 AQS 父类中都是只有定义没有内容。可以参照《源码剖析 AQS 在几个同步工具类中的使用》来详细了解。

突然想到想想那些被称为老大的,是不是整个 career 流程就是只干两件事情:submit a task, then wait and get the result。不对,还有一件事情,不是等待,而是催。“完了没,完了没?schedule 很紧的,抓点紧啊,要不要适当加点班啊……”

复制代码
V innerGet() throws InterruptedException, ExecutionException {
// 获得锁,表示执行完毕,才能获得后执行结果,否则阻塞等待执行完成再获取执行结果
acquireSharedInterruptibly(0);
return result;
}
protected int tryAcquireShared(int ignore) {
return innerIsDone()? 1 : -1;
}

至此,获得执行结果,圆满完成任务!

老大的老大,拍着咱们老大的肩膀(或者深情的抚摸着咱们老大唏嘘胡茬的脸庞)说:“亲,你这活干的漂亮!”而隔壁桌座位的几个兄弟,刚熬了几个晚上加班交付完这波 task 后,发现任务队列里又有新任务了,俺们老大又从他的另外一个老大手里接来的任务了。每个人都按照这样的角色进行着,依照这样的角色安排和谐愉快地进行着。。。

角色名

任务用户

任务管理者

任务执行者

角色属性

任务的甲方

任务的乙方

乙方的工具

角色说明

选择合适的任务执行服务,如可以根据需要选择 ThreadPoolExecutor 还是 ScheduledThreadPoolExecutor,并定制 ExecutorService 的配置。
定义好任务的工作内容和结果类型,提交任务,等待任务的执行结果

接收提交的任务;
维护执行服务内部管理;
配置工作线程执行任务

每个工作线程一直从任务执行服务获取待执行的任务,保证任务完成后返回执行结果。

Executor**** 中对应

创建获取 ExecutorService、并提交 Task 的外部接口

ExecutorService 的各种实现。如经典的 ThreadPoolExecutor,ScheduledThreadPoolExecutor

执行服务内定义的配套的 Worker 线程。如 ThreadPoolExecutor.Worker

主要接口方法

submit(Callable task)

execute(Runnable command)

runTask(Runnable task)

现实角色映射

手里有活的大老大

领人干活的老大

真正干活的码农

主要工作伪代码

taskService = createService()
future=taskService.submitTask()
future.get()

executeTask()
{ addTask()
createThread()
}

while(ture) {
getTask()
runTask()
}

四、 总结

从时序图上看主要的几个角色是这样配合完成任务提交、任务执行、获取执行结果这几个步骤的。

(点击放大图像)

  1. 外面需要提交任务的角色(如例子中老大的老大),首先创建一个任务执行服务 ExecutorService,一般使用工具类 Executors 的若干个工厂方法 创建不同特征的线程池 ThreadPoolExecutor,例子中是使用 newFixedThreadPool 方法创建有 n 个固定工作线程的线程池。
  2. 线程池是专门负责从外面接活的老大。把任务封装成一个 FutureTask 对象,并根据输入定义好要获得结果的类型,就可以 submit 任务了。
  3. 线程池就像我们团队里管人管项目的老大,各个都有一套娴熟、有效的办法来对付输入的任务和手下干活的兄弟一样,内部有一套比较完整、细致的任务管理办法,工作线程管理办法,以便应付输入的任务。这些逻辑全部在其 execute 方法中体现。
  4. 线程池接收输入的 task,根据需要创建工作线程,启动工作线程来执行 task。
  5. 工作线程在其 run 方法中一直循环,从线程池领取可以执行的 task,调用 task 的 run 方法执行 task 内定义的任务。
  6. FutureTask 的 run 方法中调用其内部类 Sync 的 innerRun 方法来执行封装的具体任务,并把任务的执行结果返回给 FutureTask 的 result 变量。
  7. 当提及任务的角色调用 FutureTask 的 get 方法获取执行结果时,Sync 的 innerGet 方法被调用。根据任务的执行状态判断,任务执行完毕则返回执行结果;未执行完毕则等待。

还记得我们费了半天劲试图找出任务执行时那个动宾结构的主语吗?从示例上看更像是线程池在向外提供任务执行的服务。就像我们的老大在代表我们接收任务、执行任务、提交执行结果。明显我们这些真正的 Worker 成了延伸,有点搞不懂到底我们是主语,还是主语延伸的工具,就像定义 ThreadPoolExecutor 的内部类 Worker 一样。我们只是工具,不是主语,是状语: execute the task by workers。突然想到毛主席当年的“数风流人物,还看今朝”,说的应该是这些 Worker 的劳苦大众吧,怎么都今朝这么久了,俺们这些 Woker 们还是风流不起来呢?风骚的作者居然在上面严肃的时序图上加了个风骚的小星星,向同行的 Worker 们致敬!

作者简介

张超盟,an ExTrender,‍CS 数据管理方向工学硕士。与妻儿蜗居于钱江畔,就职一初创安全公司任数据服务团队负责人,做数据 (存储、挖掘、服务) 方面研发。爱数据,爱代码,爱技术,爱豆吧!( idouba.net )。


感谢徐川对本文的审校。

给InfoQ 中文站投稿或者参与内容翻译工作,请邮件至 editors@cn.infoq.com 。也欢迎大家通过新浪微博( @InfoQ @丁晓昀),微信(微信号: InfoQChina )关注我们,并与我们的编辑和其他读者朋友交流(欢迎加入 InfoQ 读者交流群)。

2015-06-04 12:3711958

评论

发布
暂无评论
发现更多内容

Http轮询分为长查询和短查询总结

知识浅谈

HTTP 9月月更

跟我学Python图像处理丨带你掌握傅里叶变换原理及实现

华为云开发者联盟

Python 人工智能 企业号九月金秋榜

Code For Better 谷歌开发者之声——Google Cloud谷歌云

Fire_Shield

云原生 Google Cloud 9月月更

NestOS应用案例:容器化部署OpenStack

openEuler

架构 openEuler 开源操作系统 OpenStack

前端面试经常被问的题目,自己总结了一下

loveX001

JavaScript 前端

Serverless遇到 FinOps: Economical Serverless

华为云开发者联盟

云原生 后端 企业号九月金秋榜

死锁检测实现

C++后台开发

后台开发 线程 多线程 死锁 C++开发

开发者有话说|一名普通大专学历开发者的成长

彭发红

openEuler资源利用率提升之道 03:rubik混部引擎简介

openEuler

Linux 开源 cpu 操作系统 openEuler

大数据调度平台Airflow(七):Airflow分布式集群搭建原因及其他扩展

Lansonli

airflow 9月月更

物联网实践分享

彭发红

开发者有话说|如何写出更加优雅的代码

闫同学

个人成长

TCP协议和UDP协议详细介绍

阿柠xn

TCP 计算机网络 协议族 UDP协议 9月月更

深度剖析Istio共享代理新模式Ambient Mesh

华为云开发者联盟

云计算 云原生 后端 企业号九月金秋榜

NFTScan 与 ET.XYZ 在 NFT API 数据层面进行深度合作

NFT Research

区块链 NFT web3

探索AI技术应用场景

felix

产业落地 AI探索 API接口 模型管理

这些react面试题你会吗,反正我回答的不好

beifeng1996

前端 React

js常见手写题总结

helloworld1024fd

JavaScript 前端

SpringBoot初识

十八岁讨厌编程

Java 后端开发 9月月更

流程图布局在项目中的实践

相续心

关爱2700多万听障者,手语服务助力无声交流

HarmonyOS SDK

手语

Struts实现登录

Struts2 9月月更

[SpringBoot]配置文件格式、yaml配置及读取

十八岁讨厌编程

Java 9月月更

react20道高频面试题答案总结

beifeng1996

前端 React

19道高频vue面试题,顺便写一下自己的答案

bb_xiaxia1998

Vue 前端

js高频手写题总结

helloworld1024fd

JavaScript 前端

融云员工服务台,跟“干不完”说再见

融云 RongCloud

IT职场

关于 Angular 应用 tsconfig.json 中的 lib 属性

汪子熙

typescript 前端开发 angular web开发 9月月更

[SpringBoot]多环境配置,配置文件分类

十八岁讨厌编程

Java 后端开发 9月月更

工赋开发者社区 |【数智化】数字化工厂规划与建设方案

工赋开发者社区

VUE v-bind 数据绑定

HoneyMoose

戏(细)说Executor框架线程池任务执行全过程(下)_Java_张超盟_InfoQ精选文章