东亚银行、岚图汽车带你解锁 AIGC 时代的数字化人才培养各赛道新模式! 了解详情
写点什么

Java 多线程编程模式实战指南一:Active Object 模式(上)

  • 2014-11-21
  • 本文字数:9157 字

    阅读完需:约 30 分钟

Active Object 模式简介

Active Object 模式是一种异步编程模式。它通过对方法的调用与方法的执行进行解耦来提高并发性。若以任务的概念来说,Active Object 模式的核心则是它允许任务的提交(相当于对异步方法的调用)和任务的执行(相当于异步方法的真正执行)分离。这有点类似于 System.gc() 这个方法:客户端代码调用完 gc() 后,一个进行垃圾回收的任务被提交,但此时 JVM 并不一定进行了垃圾回收,而可能是在 gc() 方法调用返回后的某段时间才开始执行任务——回收垃圾。我们知道,System.gc() 的调用方代码是运行在自己的线程上(通常是 main 线程派生的子线程),而 JVM 的垃圾回收这个动作则由专门的线程(垃圾回收线程)来执行的。换言之,System.gc() 这个方法所代表的动作(其所定义的功能)的调用方和执行方是运行在不同的线程中的,从而提高了并发性。

再进一步介绍 Active Object 模式,我们可先简单地将其核心理解为一个名为 ActiveObject 的类,该类对外暴露了一些异步方法,如图 1 所示。

图 1. ActiveObject 对象示例

doSomething 方法的调用方和执行方运行在各自的线程上。在并发的环境下,doSomething 方法会被多个线程调用。这时所需的线程安全控制封装在 doSomething 方法背后,使得调用方代码无需关心这点,从而简化了调用方代码:从调用方代码来看,调用一个 Active Object 对象的方法与调用普通 Java 对象的方法并无太大差别。如清单 1 所示。

清单 1. Active Object 方法调用示例

复制代码
ActiveObject ao=...;
Future<string> future = ao.doSomething("data");
// 执行其它操作
String result = future.get();
System.out.println(result);
</string>

Active Object 模式的架构

当 Active Object 模式对外暴露的异步方法被调用时,与该方法调用相关的上下文信息,包括被调用的异步方法名(或其代表的操作)、调用方代码所传递的参数等,会被封装成一个对象。该对象被称为方法请求(Method Request)。方法请求对象会被存入 Active Object 模式所维护的缓冲区(Activation Queue)中,并由专门的工作线程负责根据其包含的上下文信息执行相应的操作。也就是说,方法请求对象是由运行调用方代码的线程通过调用 Active Object 模式对外暴露的异步方法生成的,而方法请求所代表的操作则由专门的线程来执行,从而实现了方法的调用与执行的分离,产生了并发。

Active Object 模式的主要参与者有以下几种。其类图如图 2 所示。

图 2. Active Object 模式的类图

(点击图像放大)

  • Proxy:负责对外暴露异步方法接口。当调用方代码调用该参与者实例的异步方法 doSomething 时,该方法会生成一个相应的 MethodRequest 实例并将其存储到 Scheduler 所维护的缓冲区中。doSomething 方法的返回值是一个表示其执行结果的外包装对象:Future 参与者的实例。异步方法 doSomething 运行在调用方代码所在的线程中。
  • MethodRequest:负责将调用方代码对 Proxy 实例的异步方法的调用封装为一个对象。该对象保留了异步方法的名称及调用方代码传递的参数等上下文信息。它使得将 Proxy 的异步方法的调用和执行分离成为可能。其 call 方法会根据其所包含上下文信息调用 Servant 实例的相应方法。
  • ActivationQueue:负责临时存储由 Proxy 的异步方法被调用时所创建的 MethodRequest 实例的缓冲区。
  • Scheduler:负责将 Proxy 的异步方法所创建的 MethodRequest 实例存入其维护的缓冲区中。并根据一定的调度策略,对其维护的缓冲区中的 MethodRequest 实例进行执行。其调度策略可以根据实际需要来定,如 FIFO、LIFO 和根据 MethodRequest 中包含的信息所定的优先级等。
  • Servant:负责对 Proxy 所暴露的异步方法的具体实现。
  • Future:负责存储和返回 Active Object 异步方法的执行结果。

Active Object 模式的序列图如图 3 所示。

图 3. Active Object 模式的序列图

(点击图像放大)

第1 步:调用方代码调用Proxy 的异步方法doSomething。

第2~7 步:doSomething 方法创建Future 实例作为该方法的返回值。并将调用方代码对该方法的调用封装为MethodRequest 对象。然后以所创建的MethodRequest 对象作为参数调用Scheduler 的enqueue 方法,以将MethodRequest 对象存入缓冲区。Scheduler 的enqueue 方法会调用Scheduler 所维护的ActivationQueue 实例的enqueue 方法,将MethodRequest 对象存入缓冲区。

第8 步:doSomething 返回其所创建的Future 实例。

第9 步:Scheduler 实例采用专门的工作线程运行dispatch 方法。

第10~12 步:dispatch 方法调用ActivationQueue 实例的dequeue 方法,获取一个MethodRequest 对象。然后调用MethodRequest 对象的call 方法

第13~16 步:MethodRequest 对象的call 方法调用与其关联的Servant 实例的相应方法doSomething。并将Servant.doSomething 方法的返回值设置到Future 实例上。

第17 步:MethodRequest 对象的call 方法返回。

上述步骤中,第1~8 步是运行在Active Object 的调用者线程中的,这几个步骤实现了将调用方代码对Active Object 所提供的异步方法的调用封装成对象(Method Request),并将其存入缓冲区。这几个步骤实现了任务的提交。第9~17 步是运行在Active Object 的工作线程中,这些步骤实现从缓冲区中读取Method Request,并对其进行执行,实现了任务的执行。从而实现了Active Object 对外暴露的异步方法的调用与执行的分离。

如果调用方代码关心Active Object 的异步方法的返回值,则可以在其需要时,调用Future 实例的get 方法来获得异步方法的真正执行结果。

Active Object 模式实战案例

某电信软件有一个彩信短号模块。其主要功能是实现手机用户给其它手机用户发送彩信时,接收方号码可以填写为对方的短号。例如,用户 13612345678 给其同事 13787654321 发送彩信时,可以将接收方号码填写为对方的短号,如 776,而非其真实的号码。

该模块处理其接收到的下发彩信请求的一个关键操作是查询数据库以获得接收方短号对应的真实号码(长号)。该操作可能因为数据库故障而失败,从而使整个请求无法继续被处理。而数据库故障是可恢复的故障,因此在短号转换为长号的过程中如果出现数据库异常,可以先将整个下发彩信请求消息缓存到磁盘中,等到数据库恢复后,再从磁盘中读取请求消息,进行重试。为方便起见,我们可以通过 Java 的对象序列化 API,将表示下发彩信的对象序列化到磁盘文件中从而实现请求缓存。下面我们讨论这个请求缓存操作还需要考虑的其它因素,以及 Active Object 模式如何帮助我们满足这些考虑。

首先,请求消息缓存到磁盘中涉及文件 I/O 这种慢的操作,我们不希望它在请求处理的主线程(即 Web 服务器的工作线程)中执行。因为这样会使该模块的响应延时增大,降低系统的响应性。并使得 Web 服务器的工作线程因等待文件 I/O 而降低了系统的吞吐量。这时,异步处理就派上用场了。Active Object 模式可以帮助我们实现请求缓存这个任务的提交和执行分离:任务的提交是在 Web 服务器的工作线程中完成,而任务的执行(包括序列化对象到磁盘文件中等操作)则是在 Active Object 工作线程中执行。这样,请求处理的主线程在侦测到短号转长号失败时即可以触发对当前彩信下发请求进行缓存,接着继续其请求处理,如给客户端响应。而此时,当前请求消息可能正在被 Active Object 线程缓存到文件中。如图 4 所示。

图 4 . 异步实现缓存

其次,每个短号转长号失败的彩信下发请求消息会被缓存为一个磁盘文件。但我们不希望这些缓存文件被存在同一个子目录下。而是希望多个缓存文件会被存储到多个子目录中。每个子目录最多可以存储指定个数(如 2000 个)的缓存文件。若当前子目录已存满,则新建一个子目录存放新的缓存文件,直到该子目录也存满,依此类推。当这些子目录的个数到达指定数量(如 100 个)时,最老的子目录(连同其下的缓存文件,如果有的话)会被删除。从而保证子目录的个数也是固定的。显然,在并发环境下,实现这种控制需要一些并发访问控制(如通过锁来控制),但是我们不希望这种控制暴露给处理请求的其它代码。而 Active Object 模式中的 Proxy 参与者可以帮助我们封装并发访问控制。

下面,我们看该案例的相关代码通过应用 Active Object 模式在实现缓存功能时满足上述两个目标。首先看请求处理的入口类。该类就是本案例的 Active Object 模式的客调用方代码。如清单 2 所示。

清单 2. 彩信下发请求处理的入口类

复制代码
public class MMSDeliveryServlet extends HttpServlet {
private static final long serialVersionUID = 5886933373599895099L;
@Override
public void doPost(HttpServletRequest req, HttpServletResponse resp)
throws ServletException, IOException {
// 将请求中的数据解析为内部对象
MMSDeliverRequest mmsDeliverReq = this.parseRequest(req.getInputStream());
Recipient shortNumberRecipient = mmsDeliverReq.getRecipient();
Recipient originalNumberRecipient = null;
try {
// 将接收方短号转换为长号
originalNumberRecipient = convertShortNumber(shortNumberRecipient);
} catch (SQLException e) {
// 接收方短号转换为长号时发生数据库异常,触发请求消息的缓存
AsyncRequestPersistence.getInstance().store(mmsDeliverReq);
// 继续对当前请求的其它处理,如给客户端响应
resp.setStatus(202);
}
}
private MMSDeliverRequest parseRequest(InputStream reqInputStream) {
MMSDeliverRequest mmsDeliverReq = new MMSDeliverRequest();
// 省略其它代码
return mmsDeliverReq;
}
private Recipient convertShortNumber(Recipient shortNumberRecipient)
throws SQLException {
Recipient recipent = null;
// 省略其它代码
return recipent;
}
}

清单 2 中的 doPost 方法在侦测到短号转换过程中发生的数据库异常后,通过调用 AsyncRequestPersistence 类的 store 方法触发对彩信下发请求消息的缓存。这里,AsyncRequestPersistence 类相当于 Active Object 模式中的 Proxy 参与者。尽管本案例涉及的是一个并发环境,但从清单 2 中的代码可见,AsyncRequestPersistence 类的调用方代码无需处理多线程同步问题。这是因为多线程同步问题被封装在 AsyncRequestPersistence 类之后。

AsyncRequestPersistence 类的代码如清单 3 所示。

清单 3. 彩信下发请求缓存入口类(Active Object 模式的 Proxy)

复制代码
// ActiveObjectPattern.Proxy
public class AsyncRequestPersistence implements RequestPersistence {
private static final long ONE_MINUTE_IN_SECONDS = 60;
private final Logger logger;
private final AtomicLong taskTimeConsumedPerInterval = new AtomicLong(0);
private final AtomicInteger requestSubmittedPerIterval = new AtomicInteger(0);
// ActiveObjectPattern.Servant
private final DiskbasedRequestPersistence
delegate = new DiskbasedRequestPersistence();
// ActiveObjectPattern.Scheduler
private final ThreadPoolExecutor scheduler;
private static class InstanceHolder {
final static RequestPersistence INSTANCE = new AsyncRequestPersistence();
}
private AsyncRequestPersistence() {
logger = Logger.getLogger(AsyncRequestPersistence.class);
scheduler = new ThreadPoolExecutor(1, 3,
60 * ONE_MINUTE_IN_SECONDS,
TimeUnit.SECONDS,
// ActiveObjectPattern.ActivationQueue
new LinkedBlockingQueue<runnable>(200),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t;
t = new Thread(r, "AsyncRequestPersistence");
return t;
}
});
scheduler.setRejectedExecutionHandler(
new ThreadPoolExecutor.DiscardOldestPolicy());
// 启动队列监控定时任务
Timer monitorTimer = new Timer(true);
monitorTimer.scheduleAtFixedRate(
new TimerTask() {
@Override
public void run() {
if (logger.isInfoEnabled()) {
logger.info("task count:"
+ requestSubmittedPerIterval
+ ",Queue size:"
+ scheduler.getQueue().size()
+ ",taskTimeConsumedPerInterval:"
+ taskTimeConsumedPerInterval.get()
+ " ms");
}
taskTimeConsumedPerInterval.set(0);
requestSubmittedPerIterval.set(0);
}
}, 0, ONE_MINUTE_IN_SECONDS * 1000);
}
public static RequestPersistence getInstance() {
return InstanceHolder.INSTANCE;
}
@Override
public void store(final MMSDeliverRequest request) {
/*
* 将对 store 方法的调用封装成 MethodRequest 对象, 并存入缓冲区。
*/
// ActiveObjectPattern.MethodRequest
Callable<boolean> methodRequest = new Callable<boolean>() {
@Override
public Boolean call() throws Exception {
long start = System.currentTimeMillis();
try {
delegate.store(request);
} finally {
taskTimeConsumedPerInterval.addAndGet(
System.currentTimeMillis() - start);
}
return Boolean.TRUE;
}
};
scheduler.submit(methodRequest);
requestSubmittedPerIterval.incrementAndGet();
}
}
</boolean></boolean></runnable>

AsyncRequestPersistence 类所实现的接口 RequestPersistence 定义了 Active Object 对外暴露的异步方法:store 方法。由于本案例不关心请求缓存的结果,故该方法没有返回值。其代码如清单 4 所示。

清单 4. RequestPersistence 接口源码

复制代码
public interface RequestPersistence {
void store(MMSDeliverRequest request);
}

AsyncRequestPersistence 类的实例变量 scheduler 相当于 Active Object 模式中的 Scheduler 参与者实例。这里我们直接使用了 JDK1.5 引入的 Executor Framework 中的 ThreadPoolExecutor。在 ThreadPoolExecutor 类的实例化时,其构造器的第 5 个参数(BlockingQueue workQueue)我们指定了一个有界阻塞队列:new LinkedBlockingQueue(200)。该队列相当于 Active Object 模式中的 ActivationQueue 参与者实例。

AsyncRequestPersistence 类的实例变量 delegate 相当于 Active Object 模式中的 Servant 参与者实例。

AsyncRequestPersistence 类的 store 方法利用匿名类生成一个 java.util.concurrent.Callable 实例 methodRequest。该实例相当于 Active Object 模式中的 MethodRequest 参与者实例。利用闭包(Closure),该实例封装了对 store 方法调用的上下文信息(包括调用参数、所调用的方法对应的操作信息)。AsyncRequestPersistence 类的 store 方法通过调用 scheduler 的 submit 方法,将 methodRequest 送入 ThreadPoolExecutor 所维护的缓冲区(阻塞队列)中。确切地说,ThreadPoolExecutor 是 Scheduler 参与者的一个“近似”实现。ThreadPoolExecutor 的 submit 方法相对于 Scheduler 的 enqueue 方法,该方法用于接纳 MethodRequest 对象,以将其存入缓冲区。当 ThreadPoolExecutor 当前使用的线程数量小于其核心线程数量时,submit 方法所接收的任务会直接被新建的线程执行。当 ThreadPoolExecutor 当前使用的线程数量大于其核心线程数时,submit 方法所接收的任务才会被存入其维护的阻塞队列中。不过,ThreadPoolExecutor 的这种任务处理机制,并不妨碍我们将它用作 Scheduler 的实现。

methodRequest 的 call 方法会调用 delegate 的 store 方法来真正实现请求缓存功能。delegate 实例对应的类 DiskbasedRequestPersistence 是请求消息缓存功能的真正实现者。其代码如清单 5 所示。

清单 5. DiskbasedRequestPersistence 类的源码

复制代码
public class DiskbasedRequestPersistence implements RequestPersistence {
// 负责缓存文件的存储管理
private final SectionBasedDiskStorage storage = new SectionBasedDiskStorage();
private final Logger logger = Logger
.getLogger(DiskbasedRequestPersistence.class);
@Override
public void store(MMSDeliverRequest request) {
// 申请缓存文件的文件名
String[] fileNameParts = storage.apply4Filename(request);
File file = new File(fileNameParts[0]);
try {
ObjectOutputStream objOut = new ObjectOutputStream(
new FileOutputStream(file));
try {
objOut.writeObject(request);
} finally {
objOut.close();
}
} catch (FileNotFoundException e) {
storage.decrementSectionFileCount(fileNameParts[1]);
logger.error("Failed to store request", e);
} catch (IOException e) {
storage.decrementSectionFileCount(fileNameParts[1]);
logger.error("Failed to store request", e);
}
}
class SectionBasedDiskStorage {
private Deque<string> sectionNames = new LinkedList<string>();
/*
* Key->value: 存储子目录名 -> 子目录下缓存文件计数器
*/
private Map<string atomicinteger=""> sectionFileCountMap
= new HashMap<string atomicinteger="">();
private int maxFilesPerSection = 2000;
private int maxSectionCount = 100;
private String storageBaseDir = System.getProperty("user.dir") + "/vpn";
private final Object sectionLock = new Object();
public String[] apply4Filename(MMSDeliverRequest request) {
String sectionName;
int iFileCount;
boolean need2RemoveSection = false;
String[] fileName = new String[2];
synchronized (sectionLock) {
// 获取当前的存储子目录名
sectionName = this.getSectionName();
AtomicInteger fileCount;
fileCount = sectionFileCountMap.get(sectionName);
iFileCount = fileCount.get();
// 当前存储子目录已满
if (iFileCount >= maxFilesPerSection) {
if (sectionNames.size() >= maxSectionCount) {
need2RemoveSection = true;
}
// 创建新的存储子目录
sectionName = this.makeNewSectionDir();
fileCount = sectionFileCountMap.get(sectionName);
}
iFileCount = fileCount.addAndGet(1);
}
fileName[0] = storageBaseDir + "/" + sectionName + "/"
+ new DecimalFormat("0000").format(iFileCount) + "-"
+ request.getTimeStamp().getTime() / 1000 + "-"
+ request.getExpiry()
+ ".rq";
fileName[1] = sectionName;
if (need2RemoveSection) {
// 删除最老的存储子目录
String oldestSectionName = sectionNames.removeFirst();
this.removeSection(oldestSectionName);
}
return fileName;
}
public void decrementSectionFileCount(String sectionName) {
AtomicInteger fileCount = sectionFileCountMap.get(sectionName);
if (null != fileCount) {
fileCount.decrementAndGet();
}
}
private boolean removeSection(String sectionName) {
boolean result = true;
File dir = new File(storageBaseDir + "/" + sectionName);
for (File file : dir.listFiles()) {
result = result && file.delete();
}
result = result && dir.delete();
return result;
}
private String getSectionName() {
String sectionName;
if (sectionNames.isEmpty()) {
sectionName = this.makeNewSectionDir();
} else {
sectionName = sectionNames.getLast();
}
return sectionName;
}
private String makeNewSectionDir() {
String sectionName;
SimpleDateFormat sdf = new SimpleDateFormat("MMddHHmmss");
sectionName = sdf.format(new Date());
File dir = new File(storageBaseDir + "/" + sectionName);
if (dir.mkdir()) {
sectionNames.addLast(sectionName);
sectionFileCountMap.put(sectionName, new AtomicInteger(0));
} else {
throw new RuntimeException(
"Cannot create section dir " + sectionName);
}
return sectionName;
}
}
}
</string>,></string>,></string></string>

methodRequest 的 call 方法的调用者代码是运行在 ThreadPoolExecutor 所维护的工作者线程中,这就保证了 store 方法的调用方和真正的执行方是分别运行在不同的线程中:服务器工作线程负责触发请求消息缓存,ThreadPoolExecutor 所维护的工作线程负责将请求消息序列化到磁盘文件中。

DiskbasedRequestPersistence 类的 store 方法中调用的 SectionBasedDiskStorage 类的 apply4Filename 方法包含了一些多线程同步控制代码(见清单 5)。这部分控制由于是封装在 DiskbasedRequestPersistence 的内部类中,对于该类之外的代码是不可见的。因此,AsyncRequestPersistence 的调用方代码无法知道该细节,这体现了 Active Object 模式对并发访问控制的封装。

小结

本篇介绍了 Active Object 模式的意图及架构,并以一个实际的案例展示了该模式的代码实现。下篇将对 Active Object 模式进行评价,并结合本文案例介绍实际运用 Active Object 模式时需要注意的一些事项。


感谢张龙对本文的审校。

给InfoQ 中文站投稿或者参与内容翻译工作,请邮件至 editors@cn.infoq.com 。也欢迎大家通过新浪微博( @InfoQ )或者腾讯微博( @InfoQ )关注我们,并与我们的编辑和其他读者朋友交流。

2014-11-21 09:4523626

评论

发布
暂无评论
发现更多内容

Tech Talk 活动回顾|化“被动”为“主动”,如何构建安全合规的智能产品

亚马逊云科技 (Amazon Web Services)

产品 安全 解决方案

区块链技术已站上真正意义的风口,如何把握?

CECBC

【刷题第15天】剑指 Offer 09. 用两个栈实现队列

白日梦

5月月更

实现compact table command

Asha

【Go实现】实践GoF的23种设计模式:工厂方法模式

元闰子

Go 设计模式 工厂方法模式

拆分电商系统为微服务

Dean.Zhang

Amazon Personalize 个性化效果评估,从准确性到多样性、新颖性和偶然性

亚马逊云科技 (Amazon Web Services)

Amazon 模型

【Meetup 预告】OpenMLDB x DolphinScheduler 链接特征工程与调度环节,打造端到端 MLOps 工作流

第四范式开发者社区

人工智能 机器学习 数据库 调度 特征工程

深入剖析 | snowflake算法

九叔(高翔龙)

算法 雪花算法 uuid 全局唯一ID snowflake

跨平台应用开发进阶(十) :uni-app 实现数据存储、获取和删除

No Silver Bullet

uni-app 数据存储 5月月更 全局

如何在你的 wordpress 网站中添加搜索框?

海拥(haiyong.site)

WordPress 5月月更

自学历程 小甲鱼Python

万里无云万里天

Python

全链路压测(十三):高可用和性能优化

老张

性能测试 全链路压测 稳定性保障

模块6作业提交

KennyQ

druid源码学习十

Nick

Apache Druid

百尺竿头更进一步丨拓展 Amazon Aurora 的读写能力之 Gaea 篇

亚马逊云科技 (Amazon Web Services)

Amazon 环境搭建

单片机上常用-GB2312、GBK汉字取模与字库偏移地址的计算与汉字描点

DS小龙哥

5月月更

跨平台应用开发进阶(九) :uni-app 实现Android原生APP-本地打包集成极光推送(JG-JPUSH)详细教程

No Silver Bullet

uni-app Andriod 极光推送 5月月更 本地打包

程序员如何保证自己开发的正确性——测试开发有感

Bruce Talk

技术 敏捷 TDD Agile

设计者模式之装饰者模式

乌龟哥哥

5月月更

SAP UI5 OData V4 模型的构造方式

Jerry Wang

JavaScript 前端 SAP ui5 5月月更

【LeetCode】在长度 2N 的数组中找出重复 N 次的元素Java题解

Albert

LeetCode 5月月更

Linux环境编译多个C程序文件

Loken

音视频 5月月更

Docker下Java文件上传服务三部曲之三

程序员欣宸

Java Docker 5月月更

Amazon MSK Serverless 现已正式推出,无需再为托管式 Kafka 集群进行容量规划

亚马逊云科技 (Amazon Web Services)

kafka Serverless

Kubectl-ice 插件展示集群容器配置信息更强大、更便捷

Marionxue

kubectl插件 kubectl-ice 容器配置

模块六:作业

本人法海

「架构实战营」

后端开发【一大波干货知识】Redis中的IO多线程(线程池)

C++后台开发

redis 多线程 线程池 后端开发 C++后台开发

python小知识-python 函数二三事

AIWeker

Python 5月月更

八、浅谈云原生监控

穿过生命散发芬芳

云原生 5月月更

滑动窗口

工程师日月

算法 5月月更

Java多线程编程模式实战指南一:Active Object模式(上)_Java_黄文海_InfoQ精选文章