写点什么

Java 多线程编程模式实战指南一:Active Object 模式(上)

  • 2014-11-21
  • 本文字数:9157 字

    阅读完需:约 30 分钟

Active Object 模式简介

Active Object 模式是一种异步编程模式。它通过对方法的调用与方法的执行进行解耦来提高并发性。若以任务的概念来说,Active Object 模式的核心则是它允许任务的提交(相当于对异步方法的调用)和任务的执行(相当于异步方法的真正执行)分离。这有点类似于 System.gc() 这个方法:客户端代码调用完 gc() 后,一个进行垃圾回收的任务被提交,但此时 JVM 并不一定进行了垃圾回收,而可能是在 gc() 方法调用返回后的某段时间才开始执行任务——回收垃圾。我们知道,System.gc() 的调用方代码是运行在自己的线程上(通常是 main 线程派生的子线程),而 JVM 的垃圾回收这个动作则由专门的线程(垃圾回收线程)来执行的。换言之,System.gc() 这个方法所代表的动作(其所定义的功能)的调用方和执行方是运行在不同的线程中的,从而提高了并发性。

再进一步介绍 Active Object 模式,我们可先简单地将其核心理解为一个名为 ActiveObject 的类,该类对外暴露了一些异步方法,如图 1 所示。

图 1. ActiveObject 对象示例

doSomething 方法的调用方和执行方运行在各自的线程上。在并发的环境下,doSomething 方法会被多个线程调用。这时所需的线程安全控制封装在 doSomething 方法背后,使得调用方代码无需关心这点,从而简化了调用方代码:从调用方代码来看,调用一个 Active Object 对象的方法与调用普通 Java 对象的方法并无太大差别。如清单 1 所示。

清单 1. Active Object 方法调用示例

复制代码
ActiveObject ao=...;
Future<string> future = ao.doSomething("data");
// 执行其它操作
String result = future.get();
System.out.println(result);
</string>

Active Object 模式的架构

当 Active Object 模式对外暴露的异步方法被调用时,与该方法调用相关的上下文信息,包括被调用的异步方法名(或其代表的操作)、调用方代码所传递的参数等,会被封装成一个对象。该对象被称为方法请求(Method Request)。方法请求对象会被存入 Active Object 模式所维护的缓冲区(Activation Queue)中,并由专门的工作线程负责根据其包含的上下文信息执行相应的操作。也就是说,方法请求对象是由运行调用方代码的线程通过调用 Active Object 模式对外暴露的异步方法生成的,而方法请求所代表的操作则由专门的线程来执行,从而实现了方法的调用与执行的分离,产生了并发。

Active Object 模式的主要参与者有以下几种。其类图如图 2 所示。

图 2. Active Object 模式的类图

(点击图像放大)

  • Proxy:负责对外暴露异步方法接口。当调用方代码调用该参与者实例的异步方法 doSomething 时,该方法会生成一个相应的 MethodRequest 实例并将其存储到 Scheduler 所维护的缓冲区中。doSomething 方法的返回值是一个表示其执行结果的外包装对象:Future 参与者的实例。异步方法 doSomething 运行在调用方代码所在的线程中。
  • MethodRequest:负责将调用方代码对 Proxy 实例的异步方法的调用封装为一个对象。该对象保留了异步方法的名称及调用方代码传递的参数等上下文信息。它使得将 Proxy 的异步方法的调用和执行分离成为可能。其 call 方法会根据其所包含上下文信息调用 Servant 实例的相应方法。
  • ActivationQueue:负责临时存储由 Proxy 的异步方法被调用时所创建的 MethodRequest 实例的缓冲区。
  • Scheduler:负责将 Proxy 的异步方法所创建的 MethodRequest 实例存入其维护的缓冲区中。并根据一定的调度策略,对其维护的缓冲区中的 MethodRequest 实例进行执行。其调度策略可以根据实际需要来定,如 FIFO、LIFO 和根据 MethodRequest 中包含的信息所定的优先级等。
  • Servant:负责对 Proxy 所暴露的异步方法的具体实现。
  • Future:负责存储和返回 Active Object 异步方法的执行结果。

Active Object 模式的序列图如图 3 所示。

图 3. Active Object 模式的序列图

(点击图像放大)

第1 步:调用方代码调用Proxy 的异步方法doSomething。

第2~7 步:doSomething 方法创建Future 实例作为该方法的返回值。并将调用方代码对该方法的调用封装为MethodRequest 对象。然后以所创建的MethodRequest 对象作为参数调用Scheduler 的enqueue 方法,以将MethodRequest 对象存入缓冲区。Scheduler 的enqueue 方法会调用Scheduler 所维护的ActivationQueue 实例的enqueue 方法,将MethodRequest 对象存入缓冲区。

第8 步:doSomething 返回其所创建的Future 实例。

第9 步:Scheduler 实例采用专门的工作线程运行dispatch 方法。

第10~12 步:dispatch 方法调用ActivationQueue 实例的dequeue 方法,获取一个MethodRequest 对象。然后调用MethodRequest 对象的call 方法

第13~16 步:MethodRequest 对象的call 方法调用与其关联的Servant 实例的相应方法doSomething。并将Servant.doSomething 方法的返回值设置到Future 实例上。

第17 步:MethodRequest 对象的call 方法返回。

上述步骤中,第1~8 步是运行在Active Object 的调用者线程中的,这几个步骤实现了将调用方代码对Active Object 所提供的异步方法的调用封装成对象(Method Request),并将其存入缓冲区。这几个步骤实现了任务的提交。第9~17 步是运行在Active Object 的工作线程中,这些步骤实现从缓冲区中读取Method Request,并对其进行执行,实现了任务的执行。从而实现了Active Object 对外暴露的异步方法的调用与执行的分离。

如果调用方代码关心Active Object 的异步方法的返回值,则可以在其需要时,调用Future 实例的get 方法来获得异步方法的真正执行结果。

Active Object 模式实战案例

某电信软件有一个彩信短号模块。其主要功能是实现手机用户给其它手机用户发送彩信时,接收方号码可以填写为对方的短号。例如,用户 13612345678 给其同事 13787654321 发送彩信时,可以将接收方号码填写为对方的短号,如 776,而非其真实的号码。

该模块处理其接收到的下发彩信请求的一个关键操作是查询数据库以获得接收方短号对应的真实号码(长号)。该操作可能因为数据库故障而失败,从而使整个请求无法继续被处理。而数据库故障是可恢复的故障,因此在短号转换为长号的过程中如果出现数据库异常,可以先将整个下发彩信请求消息缓存到磁盘中,等到数据库恢复后,再从磁盘中读取请求消息,进行重试。为方便起见,我们可以通过 Java 的对象序列化 API,将表示下发彩信的对象序列化到磁盘文件中从而实现请求缓存。下面我们讨论这个请求缓存操作还需要考虑的其它因素,以及 Active Object 模式如何帮助我们满足这些考虑。

首先,请求消息缓存到磁盘中涉及文件 I/O 这种慢的操作,我们不希望它在请求处理的主线程(即 Web 服务器的工作线程)中执行。因为这样会使该模块的响应延时增大,降低系统的响应性。并使得 Web 服务器的工作线程因等待文件 I/O 而降低了系统的吞吐量。这时,异步处理就派上用场了。Active Object 模式可以帮助我们实现请求缓存这个任务的提交和执行分离:任务的提交是在 Web 服务器的工作线程中完成,而任务的执行(包括序列化对象到磁盘文件中等操作)则是在 Active Object 工作线程中执行。这样,请求处理的主线程在侦测到短号转长号失败时即可以触发对当前彩信下发请求进行缓存,接着继续其请求处理,如给客户端响应。而此时,当前请求消息可能正在被 Active Object 线程缓存到文件中。如图 4 所示。

图 4 . 异步实现缓存

其次,每个短号转长号失败的彩信下发请求消息会被缓存为一个磁盘文件。但我们不希望这些缓存文件被存在同一个子目录下。而是希望多个缓存文件会被存储到多个子目录中。每个子目录最多可以存储指定个数(如 2000 个)的缓存文件。若当前子目录已存满,则新建一个子目录存放新的缓存文件,直到该子目录也存满,依此类推。当这些子目录的个数到达指定数量(如 100 个)时,最老的子目录(连同其下的缓存文件,如果有的话)会被删除。从而保证子目录的个数也是固定的。显然,在并发环境下,实现这种控制需要一些并发访问控制(如通过锁来控制),但是我们不希望这种控制暴露给处理请求的其它代码。而 Active Object 模式中的 Proxy 参与者可以帮助我们封装并发访问控制。

下面,我们看该案例的相关代码通过应用 Active Object 模式在实现缓存功能时满足上述两个目标。首先看请求处理的入口类。该类就是本案例的 Active Object 模式的客调用方代码。如清单 2 所示。

清单 2. 彩信下发请求处理的入口类

复制代码
public class MMSDeliveryServlet extends HttpServlet {
private static final long serialVersionUID = 5886933373599895099L;
@Override
public void doPost(HttpServletRequest req, HttpServletResponse resp)
throws ServletException, IOException {
// 将请求中的数据解析为内部对象
MMSDeliverRequest mmsDeliverReq = this.parseRequest(req.getInputStream());
Recipient shortNumberRecipient = mmsDeliverReq.getRecipient();
Recipient originalNumberRecipient = null;
try {
// 将接收方短号转换为长号
originalNumberRecipient = convertShortNumber(shortNumberRecipient);
} catch (SQLException e) {
// 接收方短号转换为长号时发生数据库异常,触发请求消息的缓存
AsyncRequestPersistence.getInstance().store(mmsDeliverReq);
// 继续对当前请求的其它处理,如给客户端响应
resp.setStatus(202);
}
}
private MMSDeliverRequest parseRequest(InputStream reqInputStream) {
MMSDeliverRequest mmsDeliverReq = new MMSDeliverRequest();
// 省略其它代码
return mmsDeliverReq;
}
private Recipient convertShortNumber(Recipient shortNumberRecipient)
throws SQLException {
Recipient recipent = null;
// 省略其它代码
return recipent;
}
}

清单 2 中的 doPost 方法在侦测到短号转换过程中发生的数据库异常后,通过调用 AsyncRequestPersistence 类的 store 方法触发对彩信下发请求消息的缓存。这里,AsyncRequestPersistence 类相当于 Active Object 模式中的 Proxy 参与者。尽管本案例涉及的是一个并发环境,但从清单 2 中的代码可见,AsyncRequestPersistence 类的调用方代码无需处理多线程同步问题。这是因为多线程同步问题被封装在 AsyncRequestPersistence 类之后。

AsyncRequestPersistence 类的代码如清单 3 所示。

清单 3. 彩信下发请求缓存入口类(Active Object 模式的 Proxy)

复制代码
// ActiveObjectPattern.Proxy
public class AsyncRequestPersistence implements RequestPersistence {
private static final long ONE_MINUTE_IN_SECONDS = 60;
private final Logger logger;
private final AtomicLong taskTimeConsumedPerInterval = new AtomicLong(0);
private final AtomicInteger requestSubmittedPerIterval = new AtomicInteger(0);
// ActiveObjectPattern.Servant
private final DiskbasedRequestPersistence
delegate = new DiskbasedRequestPersistence();
// ActiveObjectPattern.Scheduler
private final ThreadPoolExecutor scheduler;
private static class InstanceHolder {
final static RequestPersistence INSTANCE = new AsyncRequestPersistence();
}
private AsyncRequestPersistence() {
logger = Logger.getLogger(AsyncRequestPersistence.class);
scheduler = new ThreadPoolExecutor(1, 3,
60 * ONE_MINUTE_IN_SECONDS,
TimeUnit.SECONDS,
// ActiveObjectPattern.ActivationQueue
new LinkedBlockingQueue<runnable>(200),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t;
t = new Thread(r, "AsyncRequestPersistence");
return t;
}
});
scheduler.setRejectedExecutionHandler(
new ThreadPoolExecutor.DiscardOldestPolicy());
// 启动队列监控定时任务
Timer monitorTimer = new Timer(true);
monitorTimer.scheduleAtFixedRate(
new TimerTask() {
@Override
public void run() {
if (logger.isInfoEnabled()) {
logger.info("task count:"
+ requestSubmittedPerIterval
+ ",Queue size:"
+ scheduler.getQueue().size()
+ ",taskTimeConsumedPerInterval:"
+ taskTimeConsumedPerInterval.get()
+ " ms");
}
taskTimeConsumedPerInterval.set(0);
requestSubmittedPerIterval.set(0);
}
}, 0, ONE_MINUTE_IN_SECONDS * 1000);
}
public static RequestPersistence getInstance() {
return InstanceHolder.INSTANCE;
}
@Override
public void store(final MMSDeliverRequest request) {
/*
* 将对 store 方法的调用封装成 MethodRequest 对象, 并存入缓冲区。
*/
// ActiveObjectPattern.MethodRequest
Callable<boolean> methodRequest = new Callable<boolean>() {
@Override
public Boolean call() throws Exception {
long start = System.currentTimeMillis();
try {
delegate.store(request);
} finally {
taskTimeConsumedPerInterval.addAndGet(
System.currentTimeMillis() - start);
}
return Boolean.TRUE;
}
};
scheduler.submit(methodRequest);
requestSubmittedPerIterval.incrementAndGet();
}
}
</boolean></boolean></runnable>

AsyncRequestPersistence 类所实现的接口 RequestPersistence 定义了 Active Object 对外暴露的异步方法:store 方法。由于本案例不关心请求缓存的结果,故该方法没有返回值。其代码如清单 4 所示。

清单 4. RequestPersistence 接口源码

复制代码
public interface RequestPersistence {
void store(MMSDeliverRequest request);
}

AsyncRequestPersistence 类的实例变量 scheduler 相当于 Active Object 模式中的 Scheduler 参与者实例。这里我们直接使用了 JDK1.5 引入的 Executor Framework 中的 ThreadPoolExecutor。在 ThreadPoolExecutor 类的实例化时,其构造器的第 5 个参数(BlockingQueue workQueue)我们指定了一个有界阻塞队列:new LinkedBlockingQueue(200)。该队列相当于 Active Object 模式中的 ActivationQueue 参与者实例。

AsyncRequestPersistence 类的实例变量 delegate 相当于 Active Object 模式中的 Servant 参与者实例。

AsyncRequestPersistence 类的 store 方法利用匿名类生成一个 java.util.concurrent.Callable 实例 methodRequest。该实例相当于 Active Object 模式中的 MethodRequest 参与者实例。利用闭包(Closure),该实例封装了对 store 方法调用的上下文信息(包括调用参数、所调用的方法对应的操作信息)。AsyncRequestPersistence 类的 store 方法通过调用 scheduler 的 submit 方法,将 methodRequest 送入 ThreadPoolExecutor 所维护的缓冲区(阻塞队列)中。确切地说,ThreadPoolExecutor 是 Scheduler 参与者的一个“近似”实现。ThreadPoolExecutor 的 submit 方法相对于 Scheduler 的 enqueue 方法,该方法用于接纳 MethodRequest 对象,以将其存入缓冲区。当 ThreadPoolExecutor 当前使用的线程数量小于其核心线程数量时,submit 方法所接收的任务会直接被新建的线程执行。当 ThreadPoolExecutor 当前使用的线程数量大于其核心线程数时,submit 方法所接收的任务才会被存入其维护的阻塞队列中。不过,ThreadPoolExecutor 的这种任务处理机制,并不妨碍我们将它用作 Scheduler 的实现。

methodRequest 的 call 方法会调用 delegate 的 store 方法来真正实现请求缓存功能。delegate 实例对应的类 DiskbasedRequestPersistence 是请求消息缓存功能的真正实现者。其代码如清单 5 所示。

清单 5. DiskbasedRequestPersistence 类的源码

复制代码
public class DiskbasedRequestPersistence implements RequestPersistence {
// 负责缓存文件的存储管理
private final SectionBasedDiskStorage storage = new SectionBasedDiskStorage();
private final Logger logger = Logger
.getLogger(DiskbasedRequestPersistence.class);
@Override
public void store(MMSDeliverRequest request) {
// 申请缓存文件的文件名
String[] fileNameParts = storage.apply4Filename(request);
File file = new File(fileNameParts[0]);
try {
ObjectOutputStream objOut = new ObjectOutputStream(
new FileOutputStream(file));
try {
objOut.writeObject(request);
} finally {
objOut.close();
}
} catch (FileNotFoundException e) {
storage.decrementSectionFileCount(fileNameParts[1]);
logger.error("Failed to store request", e);
} catch (IOException e) {
storage.decrementSectionFileCount(fileNameParts[1]);
logger.error("Failed to store request", e);
}
}
class SectionBasedDiskStorage {
private Deque<string> sectionNames = new LinkedList<string>();
/*
* Key->value: 存储子目录名 -> 子目录下缓存文件计数器
*/
private Map<string atomicinteger=""> sectionFileCountMap
= new HashMap<string atomicinteger="">();
private int maxFilesPerSection = 2000;
private int maxSectionCount = 100;
private String storageBaseDir = System.getProperty("user.dir") + "/vpn";
private final Object sectionLock = new Object();
public String[] apply4Filename(MMSDeliverRequest request) {
String sectionName;
int iFileCount;
boolean need2RemoveSection = false;
String[] fileName = new String[2];
synchronized (sectionLock) {
// 获取当前的存储子目录名
sectionName = this.getSectionName();
AtomicInteger fileCount;
fileCount = sectionFileCountMap.get(sectionName);
iFileCount = fileCount.get();
// 当前存储子目录已满
if (iFileCount >= maxFilesPerSection) {
if (sectionNames.size() >= maxSectionCount) {
need2RemoveSection = true;
}
// 创建新的存储子目录
sectionName = this.makeNewSectionDir();
fileCount = sectionFileCountMap.get(sectionName);
}
iFileCount = fileCount.addAndGet(1);
}
fileName[0] = storageBaseDir + "/" + sectionName + "/"
+ new DecimalFormat("0000").format(iFileCount) + "-"
+ request.getTimeStamp().getTime() / 1000 + "-"
+ request.getExpiry()
+ ".rq";
fileName[1] = sectionName;
if (need2RemoveSection) {
// 删除最老的存储子目录
String oldestSectionName = sectionNames.removeFirst();
this.removeSection(oldestSectionName);
}
return fileName;
}
public void decrementSectionFileCount(String sectionName) {
AtomicInteger fileCount = sectionFileCountMap.get(sectionName);
if (null != fileCount) {
fileCount.decrementAndGet();
}
}
private boolean removeSection(String sectionName) {
boolean result = true;
File dir = new File(storageBaseDir + "/" + sectionName);
for (File file : dir.listFiles()) {
result = result && file.delete();
}
result = result && dir.delete();
return result;
}
private String getSectionName() {
String sectionName;
if (sectionNames.isEmpty()) {
sectionName = this.makeNewSectionDir();
} else {
sectionName = sectionNames.getLast();
}
return sectionName;
}
private String makeNewSectionDir() {
String sectionName;
SimpleDateFormat sdf = new SimpleDateFormat("MMddHHmmss");
sectionName = sdf.format(new Date());
File dir = new File(storageBaseDir + "/" + sectionName);
if (dir.mkdir()) {
sectionNames.addLast(sectionName);
sectionFileCountMap.put(sectionName, new AtomicInteger(0));
} else {
throw new RuntimeException(
"Cannot create section dir " + sectionName);
}
return sectionName;
}
}
}
</string>,></string>,></string></string>

methodRequest 的 call 方法的调用者代码是运行在 ThreadPoolExecutor 所维护的工作者线程中,这就保证了 store 方法的调用方和真正的执行方是分别运行在不同的线程中:服务器工作线程负责触发请求消息缓存,ThreadPoolExecutor 所维护的工作线程负责将请求消息序列化到磁盘文件中。

DiskbasedRequestPersistence 类的 store 方法中调用的 SectionBasedDiskStorage 类的 apply4Filename 方法包含了一些多线程同步控制代码(见清单 5)。这部分控制由于是封装在 DiskbasedRequestPersistence 的内部类中,对于该类之外的代码是不可见的。因此,AsyncRequestPersistence 的调用方代码无法知道该细节,这体现了 Active Object 模式对并发访问控制的封装。

小结

本篇介绍了 Active Object 模式的意图及架构,并以一个实际的案例展示了该模式的代码实现。下篇将对 Active Object 模式进行评价,并结合本文案例介绍实际运用 Active Object 模式时需要注意的一些事项。


感谢张龙对本文的审校。

给InfoQ 中文站投稿或者参与内容翻译工作,请邮件至 editors@cn.infoq.com 。也欢迎大家通过新浪微博( @InfoQ )或者腾讯微博( @InfoQ )关注我们,并与我们的编辑和其他读者朋友交流。

2014-11-21 09:4524360

评论

发布
暂无评论
发现更多内容

SpringData【Spring整合HibernateJPA】(1)

Java 程序员 后端

SQL Server 2008中的分区表(二):如何添加、查询(1)

Java 程序员 后端

正则表达式 与 XPath 语法领域细解,初学阶段的你,该怎么学?

梦想橡皮擦

11月日更

springcloud(三)网关zuul

Java 程序员 后端

springcloud(二)配置中心config

Java 程序员 后端

Java的jvm与gc概述

小鲍侃java

11月日更

Spring系列之数据源的配置 数据库 数据源 连接池的区别

Java 程序员 后端

面试题:软件测试V模型以及软件生命周期

程序员阿沐

编程 程序员 软件测试 自动化测试 教程

面试官:你说说软件测试WHX模型(图解)

程序员阿沐

程序员 软件测试 自动化测试 测试开发

软件的生命周期(软件工程各阶段的工作)

程序员阿沐

程序员 软件测试 生命周期 测试开发 测试工程师

Spring框架(五)SpringMVC高级

Java 程序员 后端

SpringData【Spring整合HibernateJPA】

Java 程序员 后端

SpringSecurity入门(一)

Java 程序员 后端

Spring新版本抛弃JVM,可独立部署,网友:要自立门户?

Java 程序员 后端

Spring中的AOP——在Advice方法中获取目标方法的参数

Java 程序员 后端

MySQL Operator 02 | 脚手架选型 & 工程创建

RadonDB

MySQL 数据库 Kubernetes RadonDB

SpringMVC--文件上传

Java 程序员 后端

如何在 CentOS 中下载包含所有依赖项的 RPM 包

吴脑的键客

centos

软件测试的策略详解(按开发阶段划分)

程序员阿沐

编程 程序员 软件测试 自动化测试 测试工程师

Spring注解缓存设计原理及实战

Java 程序员 后端

spring的事务隔离级别

Java 程序员 后端

SpringSecurity安全控件使用指南

Java 程序员 后端

东吴证券张之浩:从理论到落地的 DevOps 体系建设

BoCloud博云

DevOps 云原生 证券

Spring(四):bean标签解析

Java 程序员 后端

SpringMVC之Interceptor拦截器之登录拦截器

Java 程序员 后端

未来怎么样的测试工程师最值钱?

程序员阿沐

腾讯 软件测试 自动化测试 测试开发

SpringSecurity+JWT认证流程解析

Java 程序员 后端

Spring之AOP适配器模式

Java 程序员 后端

SpringMVC之Interceptor拦截器之登录拦截器(1)

Java 程序员 后端

SpringMVC入门第二部分

Java 程序员 后端

SpringSecurity详细介绍RememberMe功能

Java 程序员 后端

Java多线程编程模式实战指南一:Active Object模式(上)_Java_黄文海_InfoQ精选文章