写点什么

Java 多线程编程模式实战指南一:Active Object 模式(下)

2014 年 11 月 25 日

Active Object 模式的评价与实现考量

Active Object 模式通过将方法的调用与执行分离,实现了异步编程。有利于提高并发性,从而提高系统的吞吐率。

Active Object 模式还有个好处是它可以将任务(MethodRequest)的提交(调用异步方法)和任务的执行策略(Execution Policy)分离。任务的执行策略被封装在 Scheduler 的实现类之内,因此它对外是不“可见”的,一旦需要变动也不会影响其它代码,降低了系统的耦合性。任务的执行策略可以反映以下一些问题:

  • 采用什么顺序去执行任务,如 FIFO、LIFO、或者基于任务中包含的信息所定的优先级?
  • 多少个任务可以并发执行?
  • 多少个任务可以被排队等待执行?
  • 如果有任务由于系统过载被拒绝,此时哪个任务该被选中作为牺牲品,应用程序该如何被通知到?
  • 任务执行前、执行后需要执行哪些操作?

这意味着,任务的执行顺序可以和任务的提交顺序不同,可以采用单线程也可以采用多线程去执行任务等等。

当然,好处的背后总是隐藏着代价,Active Object 模式实现异步编程也有其代价。该模式的参与者有 6 个之多,其实现过程也包含了不少中间的处理:MethodRequest 对象的生成、MethodRequest 对象的移动(进出缓冲区)、MethodRequest 对象的运行调度和线程上下文切换等。这些处理都有其空间和时间的代价。因此,Active Object 模式适合于分解一个比较耗时的任务(如涉及 I/O 操作的任务):将任务的发起和执行进行分离,以减少不必要的等待时间。

虽然模式的参与者较多,但正如本文案例的实现代码所展示的,其中大部分的参与者我们可以利用 JDK 自身提供的类来实现,以节省编码时间。如表 1 所示。

表 1. 使用 JDK 现有类实现 Active Object 的一些参与者

参与者名称

可以借用的JDK

备注

Scheduler

Java Executor Framework 中的 java.util.concurrent.ExecutorService 接口的相关实现类,如 java.util.concurrent.ThreadPoolExecutor。

ExecutorService 接口所定义的 submit(Callable task) 方法相当于图 2 中的 enqueue 方法。

ActivationQueue

java.util.concurrent.LinkedBlockingQueue

若 Scheduler 采用 java.util.concurrent.ThreadPoolExecutor,则 java.util.concurrent.LinkedBlockingQueue 实例作为 ThreadPoolExecutor 构造器的参数。

MethodRequest

java.util.concurrent.Callable 接口的匿名实现类。

Callable 接口比起 Runnable 接口的优势在于它定义的 call 方法有返回值,便于将该返回值传递给 Future 实例。

Future

java.util.concurrent.Future

ExecutorService 接口所定义的 submit(Callable task) 方法的返回值类型就是 java.util.concurrent.Future。

错误隔离

错误隔离指一个任务的处理失败不影响其它任务的处理。每个 MethodRequest 实例可以看作一个任务。那么,Scheduler 的实现类在执行 MethodRequest 时需要注意错误隔离。选用 JDK 中现成的类(如 ThreadPoolExecutor)来实现 Scheduler 的一个好处就是这些类可能已经实现了错误隔离。而如果自己编写代码实现 Scheduler,用单个 Active Object 工作线程逐一执行所有任务,则需要特别注意线程的 run 方法的异常处理,确保不会因为个别任务执行时遇到一些运行时异常而导致整个线程终止。如清单 6 的示例代码所示。

清单 6. 自己动手实现 Scheduler 的错误隔离示例代码

复制代码
public class CustomScheduler implements Runnable {
private LinkedBlockingQueue<Runnable> activationQueue =
new LinkedBlockingQueue<Runnable>();
@Override
public void run() {
dispatch();
}
public <T> Future<T> enqueue(Callable<T> methodRequest) {
final FutureTask<T> task = new FutureTask<T>(methodRequest) {
@Override
public void run() {
try {
super.run();
// 捕获所以可能抛出的对象,避免该任务运行失败而导致其所在的线程终止。
} catch (Throwable t) {
this.setException(t);
}
}
};
try {
activationQueue.put(task);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return task;
}
public void dispatch() {
while (true) {
Runnable methodRequest;
try {
methodRequest = activationQueue.take();
// 防止个别任务执行失败导致线程终止的代码在 run 方法中
methodRequest.run();
} catch (InterruptedException e) {
// 处理该异常
}
}
}
}

缓冲区监控

如果 ActivationQueue 是有界缓冲区,则对缓冲区的当前大小进行监控无论是对于运维还是测试来说都有其意义。从测试的角度来看,监控缓冲区有助于确定缓冲区容量的建议值(合理值)。清单 3 所示的代码,即是通过定时任务周期性地调用 ThreadPoolExecutor 的 getQueue 方法对缓冲区的大小进行监控。当然,在监控缓冲区的时候,往往只需要大致的值,因此在监控代码中要避免不必要的锁。

缓冲区饱和处理策略

当任务的提交速率大于任务的执行数率时,缓冲区可能逐渐积压到满。这时新提交的任务会被拒绝。无论是自己编写代码还是利用 JDK 现有类来实现 Scheduler,对于缓冲区满时新任务提交失败,我们需要一个处理策略用于决定此时哪个任务会成为“牺牲品”。若使用 ThreadPoolExecutor 来实现 Scheduler 有个好处是它已经提供了几个缓冲区饱和处理策略的实现代码,应用代码可以直接调用。如清单 3 的代码所示,本文案例中我们选择了抛弃最老的任务作为处理策略。java.util.concurrent.RejectedExecutionHandler 接口是 ThreadPoolExecutor 对缓冲区饱和处理策略的抽象,JDK 中提供的具体实现如表 2 所示。

表 2. JDK 提供的缓冲区饱和处理策略实现类

实现类

所实现的处理策略

ThreadPoolExecutor.AbortPolicy

直接抛出异常。

ThreadPoolExecutor.DiscardPolicy

放弃当前被拒绝的任务(而不抛出任何异常)。

ThreadPoolExecutor.DiscardOldestPolicy

将缓冲区中最老的任务放弃,然后重新尝试接纳被拒绝的任务。

ThreadPoolExecutor.CallerRunsPolicy

在任务的提交方线程中运行被拒绝的任务。

当然,对于 ThreadPoolExecutor 而言,其工作队列满不一定就意味着新提交的任务会被拒绝。当其最大线程池大小大于其核心线程池大小时,工作队列满的情况下,新提交的任务会用所有核心线程之外的新增线程来执行,直到工作线程数达到最大线程数时,新提交的任务会被拒绝。

Scheduler****空闲工作线程清理

如果 Scheduler 采用多个工作线程(如采用 ThreadPoolExecutor 这样的线程池)来执行任务。则可能需要清理空闲的线程以节约资源。清单 3 的代码就是直接使用了 ThreadPoolExecutor 的现有功能,在初始化其实例时通过指定其构造器的第 3、4 个参数( long keepAliveTime, TimeUnit unit),告诉 ThreadPoolExecutor 对于核心工作线程以外的线程若其已经空闲了指定时间,则将其清理掉。

可复用的Active Object模式实现

尽管利用 JDK 中的现成类可以极大地简化 Active Object 模式的实现。但如果需要频繁地在不同场景下使用 Active Object 模式,则需要一套更利于复用的代码,以节约编码的时间和使代码更加易于理解。清单 7 展示一段基于 Java 动态代理的可复用的 Active Object 模式的 Proxy 参与者的实现代码。

清单 7. 可复用的 Active Object 模式 Proxy 参与者实现

复制代码
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
public abstract class ActiveObjectProxy {
private static class DispatchInvocationHandler implements InvocationHandler {
private final Object delegate;
private final ExecutorService scheduler;
public DispatchInvocationHandler(Object delegate,
ExecutorService executorService) {
this.delegate = delegate;
this.scheduler = executorService;
}
private String makeDelegateMethodName(final Method method,
final Object[] arg) {
String name = method.getName();
name = "do" + Character.toUpperCase(name.charAt(0))
+ name.substring(1);
return name;
}
@Override
public Object invoke(final Object proxy, final Method method,
final Object[] args) throws Throwable {
Object returnValue = null;
final Object delegate = this.delegate;
final Method delegateMethod;
// 如果拦截到的被调用方法是异步方法,则将其转发到相应的 doXXX 方法
if (Future.class.isAssignableFrom(method.getReturnType())) {
delegateMethod = delegate.getClass().getMethod(
makeDelegateMethodName(method, args),
method.getParameterTypes());
final ExecutorService scheduler = this.scheduler;
Callable<Object> methodRequest = new Callable<Object>() {
@Override
public Object call() throws Exception {
Object rv = null;
try {
rv = delegateMethod.invoke(delegate, args);
} catch (IllegalArgumentException e) {
throw new Exception(e);
} catch (IllegalAccessException e) {
throw new Exception(e);
} catch (InvocationTargetException e) {
throw new Exception(e);
}
return rv;
}
};
Future<Object> future = scheduler.submit(methodRequest);
returnValue = future;
} else {
// 若拦截到的方法调用不是异步方法,则直接转发
delegateMethod = delegate.getClass()
.getMethod(method.getName(),method.getParameterTypes());
returnValue = delegateMethod.invoke(delegate, args);
}
return returnValue;
}
}
/**
* 生成一个实现指定接口的 Active Object proxy 实例。
* 对 interf 所定义的异步方法的调用会被装发到 servant 的相应 doXXX 方法。
* @param interf 要实现的 Active Object 接口
* @param servant Active Object 的 Servant 参与者实例
* @param scheduler Active Object 的 Scheduler 参与者实例
* @return Active Object 的 Proxy 参与者实例
*/
public static <T> T newInstance(Class<T> interf, Object servant,
ExecutorService scheduler) {
@SuppressWarnings("unchecked")
T f = (T) Proxy.newProxyInstance(interf.getClassLoader(),
new Class[] { interf },
new DispatchInvocationHandler(servant, scheduler));
return f;
}
}

清单 7 的代码实现了可复用的 Active Object 模式的 Proxy 参与者 ActiveObjectProxy。ActiveObjectProxy 通过使用 Java 动态代理,动态生成指定接口的代理对象。对该代理对象的异步方法(即返回值类型为 java.util.concurrent.Future 的方法)的调用会被 ActiveObjectProxy 实现 InvocationHandler(DispatchInvocationHandler)所拦截,并转发给 ActiveObjectProxy 的 newInstance 方法中指定的 Servant 处理。

清单 8 所示的代码展示了通过使用 ActiveObjectProxy 快速 Active Object 模式。

清单 8. 基于可复用的 API 快速实现 Active Object 模式

复制代码
public static void main(String[] args) throws
InterruptedException, ExecutionException {
SampleActiveObject sao = ActiveObjectProxy.newInstance(
SampleActiveObject.class, new SampleActiveObjectImpl(),
Executors.newCachedThreadPool());
Future<String> ft = sao.process("Something", 1);
Thread.sleep(500);
System.out.println(ft.get());

从清单 8 的代码可见,利用可复用的 Active Object 模式 Proxy 实现,应用开发人员只要指定 Active Object 模式对外保留的接口(对应 ActiveObjectProxy.newInstance 方法的第 1 个参数),并提供一个该接口的实现类(对应 ActiveObjectProxy.newInstance 方法的第 2 个参数),再指定一个 java.util.concurrent.ExecutorService 实例(对应 ActiveObjectProxy.newInstance 方法的第 3 个参数)即可以实现 Active Object 模式。

总结

本文介绍了 Active Object 模式的意图及架构。并提供了一个实际的案例用于展示使用 Java 代码实现 Active Object 模式,在此基础上对该模式进行了评价并分享了在实际运用该模式时需要注意的事项。

参考资源


感谢张龙对本文的审校。

给InfoQ 中文站投稿或者参与内容翻译工作,请邮件至 editors@cn.infoq.com 。也欢迎大家通过新浪微博( @InfoQ )或者腾讯微博( @InfoQ )关注我们,并与我们的编辑和其他读者朋友交流。

2014 年 11 月 25 日 10:019075

评论

发布
暂无评论
发现更多内容

作业一:一致性hash实现

ruettiger

大型网站技术架构--架构篇

wei

【获奖名单公示】仅需发布3篇+文章,极客时间每日一课 VIP 等多重礼品,免费拿~

InfoQ写作平台官方

写作平台 征稿 活动专区

架构师训练营」第 5 周作业

edd

极客大学架构师训练营

week5 学习总结

任小龙

架构师第五周作业

suke

极客大学架构师训练营

Istio 升级新方式:金丝雀升级

郭旭东

Kubernetes 云原生 istio

架构0期Week5Work1

Nan Jiang

致那些高考结束的同学们

小天同学

读书 读书感悟 高考

架构师第五周总结

suke

极客大学架构师训练营

十代酷睿凌云!开启游戏本新篇章的机械师“战空”F117-V

飞天鱼2017

分布式事务精华总结篇

古月木易

分布式 分布式事务

技术选型:如何构建技术选型方法论

NORTH

极客大学架构师训练营 技术选型

第5周作业

andy

消息队列与异步架构

Lane

极客大学架构师训练营

一致性Hash

行下一首歌

极客大学架构师训练营

分布式事务精华总结篇

奈学教育

分布式 分布式事务

Lesson 5 分布式系统架构- 分布式缓存和队列 心得笔记

edd

单体架构知识点及单体架构的缺陷

奈学教育

单体架构

第五周作业 - 一致性 hash 实现

netbanner

极客大学架构师训练营

架构师训练营第五周作业

James-Pang

极客大学架构师训练营

架构师训练营第5周总结:缓存,消息队列,负载均衡,分布式数据库

hifly

负载均衡 缓存 分布式数据库 极客大学架构师训练营 消息队列

架构师训练营第五周总结

架构师 极客大学架构师训练营

架构师训练营第五周学习总结

whiter

极客大学架构师训练营

第五周作业总结

Thrine

啃碎并发(六):Java线程同步与实现

猿灯塔

单体架构知识点及单体架构的缺陷

古月木易

单体架构

Week 05 总结

鱼_XueTr

缓存 分布式数据库 消息队列

RxJS学习总结

shinji

RXJS

第五周学习总结

李白

缓存技术和直播平台缓存总结

周冬辉

InfoQ 极客传媒开发者生态共创计划线上发布会

InfoQ 极客传媒开发者生态共创计划线上发布会

Java多线程编程模式实战指南一:Active Object模式(下)-InfoQ