Active Object 模式的评价与实现考量
Active Object 模式通过将方法的调用与执行分离,实现了异步编程。有利于提高并发性,从而提高系统的吞吐率。
Active Object 模式还有个好处是它可以将任务(MethodRequest)的提交(调用异步方法)和任务的执行策略(Execution Policy)分离。任务的执行策略被封装在 Scheduler 的实现类之内,因此它对外是不“可见”的,一旦需要变动也不会影响其它代码,降低了系统的耦合性。任务的执行策略可以反映以下一些问题:
- 采用什么顺序去执行任务,如 FIFO、LIFO、或者基于任务中包含的信息所定的优先级?
- 多少个任务可以并发执行?
- 多少个任务可以被排队等待执行?
- 如果有任务由于系统过载被拒绝,此时哪个任务该被选中作为牺牲品,应用程序该如何被通知到?
- 任务执行前、执行后需要执行哪些操作?
这意味着,任务的执行顺序可以和任务的提交顺序不同,可以采用单线程也可以采用多线程去执行任务等等。
当然,好处的背后总是隐藏着代价,Active Object 模式实现异步编程也有其代价。该模式的参与者有 6 个之多,其实现过程也包含了不少中间的处理:MethodRequest 对象的生成、MethodRequest 对象的移动(进出缓冲区)、MethodRequest 对象的运行调度和线程上下文切换等。这些处理都有其空间和时间的代价。因此,Active Object 模式适合于分解一个比较耗时的任务(如涉及 I/O 操作的任务):将任务的发起和执行进行分离,以减少不必要的等待时间。
虽然模式的参与者较多,但正如本文案例的实现代码所展示的,其中大部分的参与者我们可以利用 JDK 自身提供的类来实现,以节省编码时间。如表 1 所示。
表 1. 使用 JDK 现有类实现 Active Object 的一些参与者
参与者名称
可以借用的JDK类
备注
Scheduler
Java Executor Framework 中的 java.util.concurrent.ExecutorService 接口的相关实现类,如 java.util.concurrent.ThreadPoolExecutor。
ExecutorService 接口所定义的 submit(Callable
ActivationQueue
java.util.concurrent.LinkedBlockingQueue
若 Scheduler 采用 java.util.concurrent.ThreadPoolExecutor,则 java.util.concurrent.LinkedBlockingQueue 实例作为 ThreadPoolExecutor 构造器的参数。
MethodRequest
java.util.concurrent.Callable 接口的匿名实现类。
Callable 接口比起 Runnable 接口的优势在于它定义的 call 方法有返回值,便于将该返回值传递给 Future 实例。
Future
java.util.concurrent.Future
ExecutorService 接口所定义的 submit(Callable
错误隔离
错误隔离指一个任务的处理失败不影响其它任务的处理。每个 MethodRequest 实例可以看作一个任务。那么,Scheduler 的实现类在执行 MethodRequest 时需要注意错误隔离。选用 JDK 中现成的类(如 ThreadPoolExecutor)来实现 Scheduler 的一个好处就是这些类可能已经实现了错误隔离。而如果自己编写代码实现 Scheduler,用单个 Active Object 工作线程逐一执行所有任务,则需要特别注意线程的 run 方法的异常处理,确保不会因为个别任务执行时遇到一些运行时异常而导致整个线程终止。如清单 6 的示例代码所示。
清单 6. 自己动手实现 Scheduler 的错误隔离示例代码
public class CustomScheduler implements Runnable { private LinkedBlockingQueue<Runnable> activationQueue = new LinkedBlockingQueue<Runnable>(); @Override public void run() { dispatch(); } public <T> Future<T> enqueue(Callable<T> methodRequest) { final FutureTask<T> task = new FutureTask<T>(methodRequest) { @Override public void run() { try { super.run(); // 捕获所以可能抛出的对象,避免该任务运行失败而导致其所在的线程终止。 } catch (Throwable t) { this.setException(t); } } }; try { activationQueue.put(task); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } return task; } public void dispatch() { while (true) { Runnable methodRequest; try { methodRequest = activationQueue.take(); // 防止个别任务执行失败导致线程终止的代码在 run 方法中 methodRequest.run(); } catch (InterruptedException e) { // 处理该异常 } } } }
缓冲区监控
如果 ActivationQueue 是有界缓冲区,则对缓冲区的当前大小进行监控无论是对于运维还是测试来说都有其意义。从测试的角度来看,监控缓冲区有助于确定缓冲区容量的建议值(合理值)。清单 3 所示的代码,即是通过定时任务周期性地调用 ThreadPoolExecutor 的 getQueue 方法对缓冲区的大小进行监控。当然,在监控缓冲区的时候,往往只需要大致的值,因此在监控代码中要避免不必要的锁。
缓冲区饱和处理策略
当任务的提交速率大于任务的执行数率时,缓冲区可能逐渐积压到满。这时新提交的任务会被拒绝。无论是自己编写代码还是利用 JDK 现有类来实现 Scheduler,对于缓冲区满时新任务提交失败,我们需要一个处理策略用于决定此时哪个任务会成为“牺牲品”。若使用 ThreadPoolExecutor 来实现 Scheduler 有个好处是它已经提供了几个缓冲区饱和处理策略的实现代码,应用代码可以直接调用。如清单 3 的代码所示,本文案例中我们选择了抛弃最老的任务作为处理策略。java.util.concurrent.RejectedExecutionHandler 接口是 ThreadPoolExecutor 对缓冲区饱和处理策略的抽象,JDK 中提供的具体实现如表 2 所示。
表 2. JDK 提供的缓冲区饱和处理策略实现类
实现类
所实现的处理策略
ThreadPoolExecutor.AbortPolicy
直接抛出异常。
ThreadPoolExecutor.DiscardPolicy
放弃当前被拒绝的任务(而不抛出任何异常)。
ThreadPoolExecutor.DiscardOldestPolicy
将缓冲区中最老的任务放弃,然后重新尝试接纳被拒绝的任务。
ThreadPoolExecutor.CallerRunsPolicy
在任务的提交方线程中运行被拒绝的任务。
当然,对于 ThreadPoolExecutor 而言,其工作队列满不一定就意味着新提交的任务会被拒绝。当其最大线程池大小大于其核心线程池大小时,工作队列满的情况下,新提交的任务会用所有核心线程之外的新增线程来执行,直到工作线程数达到最大线程数时,新提交的任务会被拒绝。
Scheduler****空闲工作线程清理
如果 Scheduler 采用多个工作线程(如采用 ThreadPoolExecutor 这样的线程池)来执行任务。则可能需要清理空闲的线程以节约资源。清单 3 的代码就是直接使用了 ThreadPoolExecutor 的现有功能,在初始化其实例时通过指定其构造器的第 3、4 个参数( long keepAliveTime, TimeUnit unit),告诉 ThreadPoolExecutor 对于核心工作线程以外的线程若其已经空闲了指定时间,则将其清理掉。
可复用的Active Object模式实现
尽管利用 JDK 中的现成类可以极大地简化 Active Object 模式的实现。但如果需要频繁地在不同场景下使用 Active Object 模式,则需要一套更利于复用的代码,以节约编码的时间和使代码更加易于理解。清单 7 展示一段基于 Java 动态代理的可复用的 Active Object 模式的 Proxy 参与者的实现代码。
清单 7. 可复用的 Active Object 模式 Proxy 参与者实现
import java.lang.reflect.InvocationHandler; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.lang.reflect.Proxy; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; public abstract class ActiveObjectProxy { private static class DispatchInvocationHandler implements InvocationHandler { private final Object delegate; private final ExecutorService scheduler; public DispatchInvocationHandler(Object delegate, ExecutorService executorService) { this.delegate = delegate; this.scheduler = executorService; } private String makeDelegateMethodName(final Method method, final Object[] arg) { String name = method.getName(); name = "do" + Character.toUpperCase(name.charAt(0)) + name.substring(1); return name; } @Override public Object invoke(final Object proxy, final Method method, final Object[] args) throws Throwable { Object returnValue = null; final Object delegate = this.delegate; final Method delegateMethod; // 如果拦截到的被调用方法是异步方法,则将其转发到相应的 doXXX 方法 if (Future.class.isAssignableFrom(method.getReturnType())) { delegateMethod = delegate.getClass().getMethod( makeDelegateMethodName(method, args), method.getParameterTypes()); final ExecutorService scheduler = this.scheduler; Callable<Object> methodRequest = new Callable<Object>() { @Override public Object call() throws Exception { Object rv = null; try { rv = delegateMethod.invoke(delegate, args); } catch (IllegalArgumentException e) { throw new Exception(e); } catch (IllegalAccessException e) { throw new Exception(e); } catch (InvocationTargetException e) { throw new Exception(e); } return rv; } }; Future<Object> future = scheduler.submit(methodRequest); returnValue = future; } else { // 若拦截到的方法调用不是异步方法,则直接转发 delegateMethod = delegate.getClass() .getMethod(method.getName(),method.getParameterTypes()); returnValue = delegateMethod.invoke(delegate, args); } return returnValue; } } /** * 生成一个实现指定接口的 Active Object proxy 实例。 * 对 interf 所定义的异步方法的调用会被装发到 servant 的相应 doXXX 方法。 * @param interf 要实现的 Active Object 接口 * @param servant Active Object 的 Servant 参与者实例 * @param scheduler Active Object 的 Scheduler 参与者实例 * @return Active Object 的 Proxy 参与者实例 */ public static <T> T newInstance(Class<T> interf, Object servant, ExecutorService scheduler) { @SuppressWarnings("unchecked") T f = (T) Proxy.newProxyInstance(interf.getClassLoader(), new Class[] { interf }, new DispatchInvocationHandler(servant, scheduler)); return f; } }
清单 7 的代码实现了可复用的 Active Object 模式的 Proxy 参与者 ActiveObjectProxy。ActiveObjectProxy 通过使用 Java 动态代理,动态生成指定接口的代理对象。对该代理对象的异步方法(即返回值类型为 java.util.concurrent.Future 的方法)的调用会被 ActiveObjectProxy 实现 InvocationHandler(DispatchInvocationHandler)所拦截,并转发给 ActiveObjectProxy 的 newInstance 方法中指定的 Servant 处理。
清单 8 所示的代码展示了通过使用 ActiveObjectProxy 快速 Active Object 模式。
清单 8. 基于可复用的 API 快速实现 Active Object 模式
public static void main(String[] args) throws InterruptedException, ExecutionException { SampleActiveObject sao = ActiveObjectProxy.newInstance( SampleActiveObject.class, new SampleActiveObjectImpl(), Executors.newCachedThreadPool()); Future<String> ft = sao.process("Something", 1); Thread.sleep(500); System.out.println(ft.get());
从清单 8 的代码可见,利用可复用的 Active Object 模式 Proxy 实现,应用开发人员只要指定 Active Object 模式对外保留的接口(对应 ActiveObjectProxy.newInstance 方法的第 1 个参数),并提供一个该接口的实现类(对应 ActiveObjectProxy.newInstance 方法的第 2 个参数),再指定一个 java.util.concurrent.ExecutorService 实例(对应 ActiveObjectProxy.newInstance 方法的第 3 个参数)即可以实现 Active Object 模式。
总结
本文介绍了 Active Object 模式的意图及架构。并提供了一个实际的案例用于展示使用 Java 代码实现 Active Object 模式,在此基础上对该模式进行了评价并分享了在实际运用该模式时需要注意的事项。
参考资源
- 本文的源代码在线阅读: https://github.com/Viscent/JavaConcurrencyPattern/
- 维基百科 Active Object 模式词条: http://en.wikipedia.org/wiki/Active_object
- Douglas C. Schmidt 对 Active Object 模式的定义: http://www.laputan.org/pub/sag/act-obj.pdf。
- Schmidt, Douglas et al. Pattern-Oriented Software Architecture Volume 2: Patterns for Concurrent and Networked Objects. Volume 2. Wiley, 2000
- Java theory and practice: Decorating with dynamic proxies: http://www.ibm.com/developerworks/java/library/j-jtp08305/index.html
感谢张龙对本文的审校。
给InfoQ 中文站投稿或者参与内容翻译工作,请邮件至 editors@cn.infoq.com 。也欢迎大家通过新浪微博( @InfoQ )或者腾讯微博( @InfoQ )关注我们,并与我们的编辑和其他读者朋友交流。
评论