Java 多线程编程模式实战指南之 Promise 模式

阅读数:19359 2015 年 12 月 10 日

话题:Java设计模式语言 & 开发架构

编者按:InfoQ 开设新栏目“品味书香”,精选技术书籍的精彩章节,以及分享看完书留下的思考和收获,欢迎大家关注。本文节选自黄文海著《Java 多线程编程实战指南(设计模式篇)》中第 6 章,介绍了 Java 中 Promise 模式的用法和实例。

本书其它部分内容也在本站发表过,详见:

Promise 模式简介

Promise 模式是一种异步编程模式 。它使得我们可以先开始一个任务的执行,并得到一个用于获取该任务执行结果的凭据对象,而不必等待该任务执行完毕就可以继续执行其他操作。等到我们需要该任务的执行结果时,再调用凭据对象的相关方法来获取。这样就避免了不必要的等待,增加了系统的并发性。这好比我们去小吃店,同时点了鸭血粉丝汤和生煎包。当我们点餐付完款后,我们拿到手的其实只是一张可借以换取相应食品的收银小票(凭据对象)而已,而不是对应的实物。由于鸭血粉丝汤可以较快制作好,故我们可以凭收银小票即刻兑换到。而生煎包的制作则比较耗时,因此我们可以先吃拿到手的鸭血粉丝汤,而不必饿着肚子等生煎包出炉再一起吃。等到我们把鸭血粉丝汤吃得差不多的时候,生煎包可能也出炉了,这时我们再凭收银小票去换取生煎包,如图 6-1 所示。

图 6-1.Promise 模式的日常生活例子

Promise 模式的架构

Promise 模式中,客户端代码调用某个异步方法所得到的返回值仅是一个凭据对象(该对象被称为 Promise,意为“承诺”)。凭借该对象,客户端代码可以获取异步方法相应的真正任务的执行结果。为了讨论方便,下文我们称异步方法对应的真正的任务为异步任务。

Promise 模式的主要参与者有以下几种。其类图如图 6-2 所示。

图 6-2.Promise 模式的类图

  • Promisor:负责对外暴露可以返回 Promise 对象的异步方法,并启动异步任务的执行。其主要方法及职责如下。
    • compute:启动异步任务的执行,并返回用于获取异步任务执行结果的凭据对象。
  • ​​Promise:包装异步任务处理结果的凭据对象。负责检测异步任务是否处理完毕、返回和存储异步任务处理结果。其主要方法及职责如下。
    • getResult:获取与其所属 Promise 实例关联的异步任务的执行结果。
    • setResult:设置与其所属 Promise 实例关联的异步任务的执行结果。
    • isDone:检测与其所属 Promise 实例关联的异步任务是否执行完毕。
  • Result:负责表示异步任务处理结果。具体类型由应用决定。
  • TaskExecutor:负责真正执行异步任务所代表的计算,并将其计算结果设置到相应的 Promise 实例。其主要方法及职责如下
    • run:执行异步任务所代表的计算。

客户端代码获取异步任务处理结果的过程如图 6-3 所示的序列图。

图 6-3.获取异步任务的处理结果

第 1 步:客户端代码调用 Promisor 的异步方法 compute。

第 2、3 步:compute 方法创建 Promise 实例作为该方法的返回值,并返回。

第 4 步:客户端代码调用其所得到的 Promise 对象的 getResult 方法来获取异步任务处理结果。如果此时异步任务执行尚未完成,则 getResult 方法会阻塞(即调用方代码的运行线程暂时处于阻塞状态)。

异步任务的真正执行以及其处理结果的设置如图 6-4 所示的序列图。

图 6-4.设置异步任务的处理结果

第 1 步:Promisor 的异步方法 compute 创建 TaskExecutor 实例。

第 2 步:TaskExecutor 的 run 方法被执行(可以由专门的线程或者线程池 来调用 run 方法)。

第 3 步:run 方法创建表示其执行结果的 Result 实例。

第 4、5 步:run 方法将其处理结果设置到相应的 Promise 实例上。

Promise 模式实战案例解析

某系统的一个数据同步模块需要将一批本地文件上传到指定的目标 FTP 服务器上。这些文件是根据页面中的输入条件查询数据库的相应记录生成的。在将文件上传到目标服务器之前,需要对 FTP 客户端实例进行初始化(包括与对端服务器建立网络连接、向服务器发送登录用户和向服务器发送登录密码)。而 FTP 客户端实例初始化这个操作比较耗时间,我们希望它尽可能地在本地文件上传之前准备就绪。因此我们可以引入异步编程,使得 FTP 客户端实例初始化和本地文件上传这两个任务能够并发执行,减少不必要的等待。另一方面,我们不希望这种异步编程增加了代码编写的复杂性。这时,Promise 模式就可以派上用场了:先开始 FTP 客户端实例的初始化,并得到一个获取 FTP 客户端实例的凭据对象。在不必等待 FTP 客户端实例初始化完毕的情况下,每生成一个本地文件,就通过凭据对象获取 FTP 客户端实例,再通过该 FTP 客户端实例将文件上传到目标服务器上。代码如清单 6-1 所示 。

清单 6-1.数据同步模块的入口类

复制代码
public class DataSyncTask implements Runnable {
private final Map<String, String> taskParameters;
public DataSyncTask(Map<String, String> taskParameters) {
this.taskParameters = taskParameters;
}
@Override
public void run() {
String ftpServer = taskParameters.get("server");
String ftpUserName = taskParameters.get("userName");
String password = taskParameters.get("password");
// 先开始初始化 FTP 客户端实例
Future<FTPClientUtil> ftpClientUtilPromise = FTPClientUtil.newInstance(
ftpServer, ftpUserName, password);
// 查询数据库生成本地文件
generateFilesFromDB();
FTPClientUtil ftpClientUtil = null;
try {
// 获取初始化完毕的 FTP 客户端实例
ftpClientUtil = ftpClientUtilPromise.get();
} catch (InterruptedException e) {
;
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
// 上传文件
uploadFiles(ftpClientUtil);
// 省略其他代码
}
private void generateFilesFromDB() {
// 省略其他代码
}
private void uploadFiles(FTPClientUtil ftpClientUtil) {
Set<File> files = retrieveGeneratedFiles();
for (File file : files) {
try {
ftpClientUtil.upload(file);
} catch (Exception e) {
e.printStackTrace();
}
}
}
private Set<File> retrieveGeneratedFiles() {
Set<File> files = new HashSet<File>();
// 省略其他代码
return files;
}
}

从清单 6-1 的代码中可以看出,DataSyncTask 类的 run 方法先开始 FTP 客户端实例的初始化,并得到获取相应 FTP 客户端实例的凭据对象 ftpClientUtilPromise。接着,它直接开始查询数据库并生成本地文件。而此时,FTP 客户端实例的初始化可能尚未完成。在本地文件生成之后,run 方法通过调用 ftpClientUtilPromise 的 get 方法来获取相应的 FTP 客户端实例。此时,如果相应的 FTP 客户端实例的初始化仍未完成,则该调用会阻塞,直到相应的 FTP 客户端实例的初始化完成或者失败。run 方法获取到 FTP 客户端实例后,调用其 upload 方法将文件上传到指定的 FTP 服务器。

清单 6-1 代码所引用的 FTP 客户端工具类 FTPClientUtil 的代码如清单 6-2 所示。

清单 6-2.FTP 客户端工具类源码

复制代码
// 模式角色:Promise.Promisor、Promise.Result
public class FTPClientUtil {
private final FTPClient ftp = new FTPClient();
private final Map<String, Boolean> dirCreateMap = new HashMap<String, Boolean>();
private FTPClientUtil() {
}
// 模式角色:Promise.Promisor.compute
public static Future<FTPClientUtil> newInstance(final String ftpServer,
final String userName, final String password) {
Callable<FTPClientUtil> callable = new Callable<FTPClientUtil>() {
@Override
public FTPClientUtil call() throws Exception {
FTPClientUtil self = new FTPClientUtil();
self.init(ftpServer, userName, password);
return self;
}
};
//task 相当于模式角色:Promise.Promise
final FutureTask<FTPClientUtil> task = new FutureTask<FTPClientUtil>(
callable);
/*
下面这行代码与本案例的实际代码并不一致,这是为了讨论方便。
下面新建的线程相当于模式角色:Promise.TaskExecutor
*/
new Thread(task).start();
return task;
}
private void init(String ftpServer, String userName, String password)
throws Exception {
FTPClientConfig config = new FTPClientConfig();
ftp.configure(config);
int reply;
ftp.connect(ftpServer);
System.out.print(ftp.getReplyString());
reply = ftp.getReplyCode();
if (!FTPReply.isPositiveCompletion(reply)) {
ftp.disconnect();
throw new RuntimeException("FTP server refused connection.");
}
boolean isOK = ftp.login(userName, password);
if (isOK) {
System.out.println(ftp.getReplyString());
} else {
throw new RuntimeException("Failed to login." + ftp.getReplyString());
}
reply = ftp.cwd("~/subspsync");
if (!FTPReply.isPositiveCompletion(reply)) {
ftp.disconnect();
throw new RuntimeException("Failed to change working directory.reply:"
+ reply);
} else {
System.out.println(ftp.getReplyString());
}
ftp.setFileType(FTP.ASCII_FILE_TYPE);
}
public void upload(File file) throws Exception {
InputStream dataIn = new BufferedInputStream(new FileInputStream(file),
1024 * 8);
boolean isOK;
String dirName = file.getParentFile().getName();
String fileName = dirName + '/' + file.getName();
ByteArrayInputStream checkFileInputStream = new ByteArrayInputStream(
"".getBytes());
try {
if (!dirCreateMap.containsKey(dirName)) {
ftp.makeDirectory(dirName);
dirCreateMap.put(dirName, null);
}
try {
isOK = ftp.storeFile(fileName, dataIn);
} catch (IOException e) {
throw new RuntimeException("Failed to upload " + file, e);
}
if (isOK) {
ftp.storeFile(fileName + ".c", checkFileInputStream);
} else {
throw new RuntimeException("Failed to upload " + file + ",reply:" +
","+ ftp.getReplyString());
}
} finally {
dataIn.close();
}
}
public void disconnect() {
if (ftp.isConnected()) {
try {
ftp.disconnect();
} catch (IOException ioe) {
// 什么也不做
}
}
}
}

FTPClientUtil 类封装了 FTP 客户端,其构造方法是 private 修饰的,因此其他类无法通过 new 来生成相应的实例,而是通过其静态方法 newInstance 来获得实例。不过 newInstance 方法的返回值并不是一个 FTPClientUtil 实例,而是一个可以获取 FTPClientUtil 实例的凭据对象 java.util.concurrent.Future(具体说是 java.util.concurrent.FutureTask,它实现了 java.util.concurrent.Future 接口)实例。因此,FTPClientUtil 既相当于 Promise 模式中的 Promisor 参与者实例,又相当于 Result 参与者实例。而 newInstance 方法的返回值 java.util.concurrent.FutureTask 实例既相当于 Promise 参与者实例,又相当于 TaskExecutor 参与者实例:newInstance 方法的返回值 java.util.concurrent.FutureTask 实例不仅负责该方法真正处理结果(初始化完毕的 FTP 客户端实例)的存储和获取,还负责执行异步任务(调用 FTPClientUtil 实例的 init 方法),并设置任务的处理结果。

从如清单 6-2 所示的 Promise 客户端代码(DataSyncTask 类的 run 方法)来看,使用 Promise 模式的异步编程并没有本质上增加编程的复杂性:客户端代码的编写方式与同步编程并没有太大差别,唯一一点差别就是获取 FTP 客户端实例的时候多了一步对 java.util.concurrent.FutureTask 实例的 get 方法的调用。

Promise 模式的评价与实现考量

Promise 模式既发挥了异步编程的优势——增加系统的并发性,减少不必要的等待,又保持了同步编程的简单性:有关异步编程的细节,如创建新的线程或者提交任务到线程池执行等细节,都被封装在 Promisor 参与者实例中,而 Promise 的客户端代码则无须关心这些细节,其编码方式与同步编程并无本质上差别。这点正如清单 6-1 代码所展示的,客户端代码仅仅需要调用 FTPClientUtil 的 newInstance 静态方法,再调用其返回值的 get 方法,即可获得一个初始化完毕的 FTP 客户端实例。这本质上还是同步编程。当然,客户端代码也不能完全无视 Promise 模式的异步编程这一特性:为了减少客户端代码在调用 Promise 的 getResult 方法时出现阻塞的可能,客户端代码应该尽可能早地调用 Promisor 的异步方法,并尽可能晚地调用 Promise 的 getResult 方法。这当中间隔的时间可以由客户端代码用来执行其他操作,同时这段时间可以给 TaskExecutor 用于执行异步任务。

Promise 模式一定程度上屏蔽了异步、同步编程的差异。前文我们一直说 Promisor 对外暴露的 compute 方法是个异步方法。事实上,如果 compute 方法是一个同步方法,那么 Promise 模式的客户端代码的编写方式也是一样的。也就是说,无论 compute 方法是一个同步方法还是异步方法,Promise 客户端代码的编写方式都是一样的。例如,本章案例中 FTPClientUtil 的 newInstance 方法如果改成同步方法,我们只需要将其方法体中的语句 new Thread(task).start(); 改为 task.run(); 即可。而该案例中的其他代码无须更改。这就在一定程度上屏蔽了同步、异步编程的差异。而这可以给代码调试或者问题定位带来一定的便利。比如,我们的本意是要将 compute 方法设计成一个异步方法,但在调试代码的时候发现结果不对,那么我们可以尝试临时将其改为同步方法。若此时原先存在的问题不再出现,则说明问题是 compute 方法被编码为异步方法后所产生的多线程并发访问控制不正确导致的。

1. 异步方法的异常处理

如果 Promisor 的 compute 方法是个异步方法,那么客户端代码在调用完该方法后异步任务可能尚未开始执行。另外,异步任务运行在自己的线程中,而不是 compute 方法的调用方线程中。因此,异步任务执行过程中产生的异常无法在 compute 方法中抛出。为了让 Promise 模式的客户端代码能够捕获到异步任务执行过程中出现的异常,一个可行的办法是让 TaskExecutor 在执行任务捕获到异常后,将异常对象“记录”到 Promise 实例的一个专门的实例变量上,然后由 Promise 实例的 getResult 方法对该实例变量进行检查。若该实例变量的值不为 null,则 getResult 方法抛出异常。这样,Promise 模式的客户端代码通过捕获 getResult 方法抛出的异常即可“知道”异步任务执行过程中出现的异常。JDK 中提供的类 java.util.concurrent.FutureTask 就是采用这种方法对 compute 异步方法的异常进行处理的。

2. 轮询(Polling)

客户端代码对 Promise 的 getResult 的调用可能由于异步任务尚未执行完毕而阻塞,这实际上也是一种等待。虽然我们可以通过尽可能早地调用 compute 方法并尽可能晚地调用 getResult 方法来减少这种等待的可能性,但是它仍然可能会出现。某些场景下,我们可能根本不希望进行任何等待。此时,我们需要在调用 Promise 的 getResult 方法之前确保异步任务已经执行完毕。因此,Promise 需要暴露一个 isDone 方法用于检测异步任务是否已执行完毕。JDK 提供的类 java.util.concurrent.FutureTask 的 isDone 方法正是出于这种考虑,它允许我们在“适当”的时候才调用 Promise 的 getResult 方法(相当于 FutureTask 的 get 方法)。

3. 异步任务的执行

本章案例中,异步任务的执行我们是通过新建一个线程,由该线程去调用 TaskExecutor 的 run 方法来实现的(见清单 6-2)。这只是为了讨论方便。如果系统中同时存在多个线程调用 Promisor 的异步方法,而每个异步方法都启动了各自的线程去执行异步任务,这可能导致一个 JVM 中启动的线程数量过多,增加了线程调度的负担,从而反倒降低了系统的性能。因此,如果 Promise 模式的客户端并发量比较大,则需要考虑由线程池负责执行 TaskExecutor 的 run 方法来实现异步任务的执行。例如,如清单 6-2 所示的异步任务如果改用线程池去执行,我们只需要将代码改为类似如清单 6-3 所示的代码即可。

清单 6-3.用线程池执行异步任务

复制代码
public class FTPClientUtil {
private volatilestatic ThreadPoolExecutor threadPoolExecutor;
static {
threadPoolExecutor = new ThreadPoolExecutor(1,Runtime.getRuntime()
.availableProcessors() * 2,
60,
TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(10), new ThreadFactory() {
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setDaemon(true);
return t;
}
}, new ThreadPoolExecutor.CallerRunsPolicy());
}
private final FTPClient ftp = new FTPClient();
private final Map<String, Boolean> dirCreateMap = new HashMap<String, Boolean>();
// 私有构造器
private FTPClientUtil() {
}
public static Future<FTPClientUtil> newInstance(final String ftpServer,
final String userName, final String password) {
Callable<FTPClientUtil> callable = new Callable<FTPClientUtil>() {
@Override
public FTPClientUtil call() throws Exception {
FTPClientUtil self = new FTPClientUtil();
self.init(ftpServer, userName, password);
return self;
}
};
final FutureTask<FTPClientUtil> task = new FutureTask<FTPClientUtil>(
callable);
threadPoolExecutor.execute(task);
return task;
}
private void init(String ftpServer, String userName, String password)
throws Exception {
// 省略与清单 6-2 中相同的代码
}
public void upload(File file) throws Exception {
// 省略与清单 6-2 中相同的代码
}
public void disconnect() {
// 省略与清单 6-2 中相同的代码
}
}

Promise 模式的可复用实现代码

JDK1.5 开始提供的接口 java.util.concurrent.Future 可以看成是 Promise 模式中 Promise 参与者的抽象,其声明如下:

复制代码
public interface Future<V>

该接口的类型参数 V 相当于 Promise 模式中的 Result 参与者。该接口定义的方法及其与 Promise 参与者相关方法之间的对应关系如表 6-1 所示。

表 6-1.接口 java.util.concurrent.Future 与 Promise 参与者的对应关系

接口 java.util.concurrent.Future 的实现类 java.util.concurrent.FutureTask 可以看作 Promise 模式的 Promise 参与者实例。

如清单 6-2 所示的代码中的异步方法 newInstance 展示了如何使用 java.util.concurrent.FutureTask 来作为 Promise 参与者。

Java 标准库实例

JAX-WS 2.0 API 中用于支持调用 Web Service 的接口 javax.xml.ws.Dispatch 就使用了 Promise 模式。该接口用于异步调用 Web Service 的方法声明如下:

复制代码
Response<T>invokeAsync(T msg)

该方法不等对端服务器给响应就返回了(即实现了异步调用 Web Service),从而避免了 Web Service 客户端进行不必要的等待。而客户端需要其调用的 Web Service 的响应时,可以调用 invokeAsync 方法的返回值的相关方法来获取。invokeAsync 的返回值类型为 javax.xml.ws.Response,它继承自 java.util.concurrent.Future。因此,javax.xml.ws.Dispatch 相当于 Promise 模式中的 Promisor 参与者实例,其异步方法 invokeAsync(T msg) 的返回值相当于 Promise 参与者实例。

相关模式

1. Guarded Suspension 模式

Promise 模式的客户端代码调用 Promise 的 getResult 方法获取异步任务处理结果时,如果异步任务已经执行完毕,则该调用会直接返回。否则,该调用会阻塞直到异步任务处理结束或者出现异常。这种通过线程阻塞而进行的等待可以看作 Guarded Suspension 模式的一个实例。只不过,一般情况下 Promise 参与者我们可以直接使用 JDK 中提供的类 java.util.concurrent.FutureTask 来实现,而无须自行编码。关于 java.util.concurrent.FutureTask 如何实现通过阻塞去等待异步方法执行结束,感兴趣的读者可以去阅读 JDK 标准库的源码。

2. Active Object 模式

Active Object 模式可以看成是包含了 Promise 模式的复合模式。其 Proxy 参与者相当于 Promise 模式的 Promisor 参与者。Proxy 参与者的异步方法返回值相当于 Promise 模式的 Promise 参与者实例。Active Object 模式的 Scheduler 参与者相当于 Promise 模式的 TaskExecutor 参与者。

3. Master-Slave 模式

Master-Slave 模式中,Slave 参与者返回其对子任务的处理结果可能需要使用 Promise 模式。此时,Slave 参与者相当于 Promise 模式的 Promisor 参与者,其 subService 方法的返回值是一个 Promise 模式的 Promise 参与者实例。

4. Factory Method 模式

Promise 模式中的 Promisor 参与者可以看成是 Factory Method 模式的一个例子:Promisor 的异步方法可以看成一个工厂方法,该方法的返回值是一个 Promise 实例。

参考资源

  1. Mark Grand. Patterns in Java, Volume 1: A Catalog of Reusable Design Patterns Illustrated with UML, SecondEdition.Wiley, 2002.
  2. JDK 标准库源码.
  3. JAX-WS 2.0 介绍.
  4. Erich Gamma 等. 设计模式:可复用面向对象软件的基础(英文版). 机械工业出版社,2002.

书籍介绍

《Java 多线程编程实战指南(设计模式篇)》采用 Java(JDK1.6)语言和 UML 为描述语言,并结合作者多年工作经历的相关实战案例,介绍了多线程环境下常用设计模式的来龙去脉:各个设计模式是什么样的及其典型的实际应用场景、实际应用时需要注意的事项以及各个模式的可复用代码实现。