阿里云「飞天发布时刻」2024来啦!新产品、新特性、新能力、新方案,等你来探~ 了解详情
写点什么

聊聊并发——生产者消费者模式

  • 2014-04-24
  • 本文字数:4885 字

    阅读完需:约 16 分钟

在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题。该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度。

为什么要使用生产者和消费者模式

在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。

什么是生产者消费者模式

生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。

这个阻塞队列就是用来给生产者和消费者解耦的。纵观大多数设计模式,都会找一个第三者出来进行解耦,如工厂模式的第三者是工厂类,模板模式的第三者是模板类。在学习一些设计模式的过程中,如果先找到这个模式的第三者,能帮助我们快速熟悉一个设计模式。

生产者消费者模式实战

我和同事一起利用业余时间开发的 Yuna 工具中使用了生产者和消费者模式。首先我先介绍下 Yuna 工具,在阿里巴巴很多同事都喜欢通过邮件分享技术文章,因为通过邮件分享很方便,同学们在网上看到好的技术文章,复制粘贴发送就完成了分享,但是我们发现技术文章不能沉淀下来,对于新来的同学看不到以前分享的技术文章,大家也很难找到以前分享过的技术文章。为了解决这问题,我们开发了 Yuna 工具。Yuna 取名自我喜欢的一款游戏最终幻想里的女主角。

首先我们申请了一个专门用来收集分享邮件的邮箱,比如 share@alibaba.com,同学将分享的文章发送到这个邮箱,让同学们每次都抄送到这个邮箱肯定很麻烦,所以我们的做法是将这个邮箱地址放在部门邮件列表里,所以分享的同学只需要象以前一样向整个部门分享文章就行,Yuna 工具通过读取邮件服务器里该邮箱的邮件,把所有分享的邮件下载下来,包括邮件的附件,图片,和邮件回复,我们可能会从这个邮箱里下载到一些非分享的文章,所以我们要求分享的邮件标题必须带有一个关键字,比如[内贸技术分享],下载完邮件之后,通过 confluence 的 web service 接口,把文章插入到 confluence 里,这样新同事就可以在 confluence 里看以前分享过的文章,并且 Yuna 工具还可以自动把文章进行分类和归档。

为了快速上线该功能,当时我们花了三天业余时间快速开发了 Yuna1.0 版本。在 1.0 版本中我并没有使用生产者消费模式,而是使用单线程来处理,因为当时只需要处理我们一个部门的邮件,所以单线程明显够用,整个过程是串行执行的。在一个线程里,程序先抽取全部的邮件,转化为文章对象,然后添加全部的文章,最后删除抽取过的邮件。代码如下:

复制代码
public void extract() {
logger.debug(" 开始 " + getExtractorName() + "。。");
// 抽取邮件
List<Article> articles = extractEmail();
// 添加文章
for (Article article : articles) {
addArticleOrComment(article);
}
// 清空邮件
cleanEmail();
logger.debug(" 完成 " + getExtractorName() + "。。");
}

Yuna 工具在推广后,越来越多的部门使用这个工具,处理的时间越来越慢,Yuna 是每隔 5 分钟进行一次抽取的,而当邮件多的时候一次处理可能就花了几分钟,于是我在 Yuna2.0 版本里使用了生产者消费者模式来处理邮件,首先生产者线程按一定的规则去邮件系统里抽取邮件,然后存放在阻塞队列里,消费者从阻塞队列里取出文章后插入到 conflunce 里。代码如下:

复制代码
public class QuickEmailToWikiExtractor extends AbstractExtractor {
private ThreadPoolExecutor threadsPool;
private ArticleBlockingQueue<ExchangeEmailShallowDTO> emailQueue;
public QuickEmailToWikiExtractor() {
emailQueue= new ArticleBlockingQueue<ExchangeEmailShallowDTO>();
int corePoolSize = Runtime.getRuntime().availableProcessors() * 2;
threadsPool = new ThreadPoolExecutor(corePoolSize, corePoolSize, 10l, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(2000));
}
public void extract() {
logger.debug(" 开始 " + getExtractorName() + "。。");
long start = System.currentTimeMillis();
// 抽取所有邮件放到队列里
new ExtractEmailTask().start();
// 把队列里的文章插入到 Wiki
insertToWiki();
long end = System.currentTimeMillis();
double cost = (end - start) / 1000;
logger.debug(" 完成 " + getExtractorName() + ", 花费时间:" + cost + " 秒 ");
}
/**
* 把队列里的文章插入到 Wiki
*/
private void insertToWiki() {
// 登录 wiki, 每间隔一段时间需要登录一次
confluenceService.login(RuleFactory.USER_NAME, RuleFactory.PASSWORD);
while (true) {
//2 秒内取不到就退出
ExchangeEmailShallowDTO email = emailQueue.poll(2, TimeUnit.SECONDS);
if (email == null) {
break;
}
threadsPool.submit(new insertToWikiTask(email));
}
}
protected List<Article> extractEmail() {
List<ExchangeEmailShallowDTO> allEmails = getEmailService().queryAllEmails();
if (allEmails == null) {
return null;
}
for (ExchangeEmailShallowDTO exchangeEmailShallowDTO : allEmails) {
emailQueue.offer(exchangeEmailShallowDTO);
}
return null;
}
/**
* 抽取邮件任务
*
* @author tengfei.fangtf
*/
public class ExtractEmailTask extends Thread {
public void run() {
extractEmail();
}
}
}

多生产者和多消费者场景

在多核时代,多线程并发处理速度比单线程处理速度更快,所以我们可以使用多个线程来生产数据,同样可以使用多个消费线程来消费数据。而更复杂的情况是,消费者消费的数据,有可能需要继续处理,于是消费者处理完数据之后,它又要作为生产者把数据放在新的队列里,交给其他消费者继续处理。如下图:

我们在一个长连接服务器中使用了这种模式,生产者 1 负责将所有客户端发送的消息存放在阻塞队列 1 里,消费者 1 从队列里读消息,然后通过消息 ID 进行 hash 得到 N 个队列中的一个,然后根据编号将消息存放在到不同的队列里,每个阻塞队列会分配一个线程来消费阻塞队列里的数据。如果消费者 2 无法消费消息,就将消息再抛回到阻塞队列 1 中,交给其他消费者处理。

以下是消息总队列的代码;

复制代码
/**
* 总消息队列管理
*
* @author tengfei.fangtf
*/
public class MsgQueueManager implements IMsgQueue{
private static final Logger LOGGER
= LoggerFactory.getLogger(MsgQueueManager.class);
/**
* 消息总队列
*/
public final BlockingQueue<Message> messageQueue;
private MsgQueueManager() {
messageQueue = new LinkedTransferQueue<Message>();
}
public void put(Message msg) {
try {
messageQueue.put(msg);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
public Message take() {
try {
return messageQueue.take();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return null;
}
}

启动一个消息分发线程。在这个线程里子队列自动去总队列里获取消息。

复制代码
/**
* 分发消息,负责把消息从大队列塞到小队列里
*
* @author tengfei.fangtf
*/
static class DispatchMessageTask implements Runnable {
@Override
public void run() {
BlockingQueue<Message> subQueue;
for (;;) {
// 如果没有数据,则阻塞在这里
Message msg = MsgQueueFactory.getMessageQueue().take();
// 如果为空,则表示没有 Session 机器连接上来,
需要等待,直到有 Session 机器连接上来
while ((subQueue = getInstance().getSubQueue()) == null) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
// 把消息放到小队列里
try {
subQueue.put(msg);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}

使用 Hash 算法获取一个子队列。

复制代码
/**
* 均衡获取一个子队列。
*
* @return
*/
public BlockingQueue<Message> getSubQueue() {
int errorCount = 0;
for (;;) {
if (subMsgQueues.isEmpty()) {
return null;
}
int index = (int) (System.nanoTime() % subMsgQueues.size());
try {
return subMsgQueues.get(index);
} catch (Exception e) {
// 出现错误表示,在获取队列大小之后,队列进行了一次删除操作
LOGGER.error(" 获取子队列出现错误 ", e);
if ((++errorCount) < 3) {
continue;
}
}
}
}

使用的时候我们只需要往总队列里发消息。

复制代码
// 往消息队列里添加一条消息
IMsgQueue messageQueue = MsgQueueFactory.getMessageQueue();
Packet msg = Packet.createPacket(Packet64FrameType.
TYPE_DATA, "{}".getBytes(), (short) 1);
messageQueue.put(msg);

小结

本章讲解了生产者消费者模式,并给出了实例。读者可以在平时的工作中思考下哪些场景可以使用生产者消费者模式,我相信这种场景应该非常之多,特别是需要处理任务时间比较长的场景,比如上传附件并处理,用户把文件上传到系统后,系统把文件丢到队列里,然后立刻返回告诉用户上传成功,最后消费者再去队列里取出文件处理。比如调用一个远程接口查询数据,如果远程服务接口查询时需要几十秒的时间,那么它可以提供一个申请查询的接口,这个接口把要申请查询任务放数据库中,然后该接口立刻返回。然后服务器端用线程轮询并获取申请任务进行处理,处理完之后发消息给调用方,让调用方再来调用另外一个接口拿数据。

另外 Java 中的线程池类其实就是一种生产者和消费者模式的实现方式,但是实现方法更高明。生产者把任务丢给线程池,线程池创建线程并处理任务,如果将要运行的任务数大于线程池的基本线程数就把任务扔到阻塞队列里,这种做法比只使用一个阻塞队列来实现生产者和消费者模式显然要高明很多,因为消费者能够处理直接就处理掉了,这样速度更快,而生产者先存,消费者再取这种方式显然慢一些。

我们的系统也可以使用线程池来实现多生产者消费者模式。比如创建 N 个不同规模的 Java 线程池来处理不同性质的任务,比如线程池 1 将数据读到内存之后,交给线程池 2 里的线程继续处理压缩数据。线程池 1 主要处理 IO 密集型任务,线程池 2 主要处理 CPU 密集型任务。


感谢张龙对本文的审校,感谢张龙对本文的策划。

给InfoQ 中文站投稿或者参与内容翻译工作,请邮件至 editors@cn.infoq.com 。也欢迎大家通过新浪微博( @InfoQ )或者腾讯微博( @InfoQ )关注我们,并与我们的编辑和其他读者朋友交流。

2014-04-24 20:1033235

评论 1 条评论

发布
用户头像
2021-03-03 17:24
回复
没有更多了
发现更多内容

微博评论高性能高可用计算架构

艾瑾行

架构训练营

it资产管理软件哪个好?既好用又安全?

行云管家

运维 IT运维 IT资产 IT资产管理

DTCC 2023,8月16日北京见!

KaiwuDB

KaiwuDB DTCC 2023

安徽阜阳是几线城市?有正规等级保护测评机构吗?

行云管家

等保 等级保护 等保测评机构 阜阳

融云荣获「2023 中国数字生态通信领军企业」奖

融云 RongCloud

互联网 通信 数字 融云 AIGC

如何快速完成PostgreSQL数据迁移?|NineData

NineData

postgresql 数据迁移 不停机发布 NineData 结构迁移

uiautomator2 自动化测试工具使用

霍格沃兹测试开发学社

10个微服务设计模式

越长大越悲伤

微服务 微服务设计

如何基于 ACK Serverless 快速部署 AI 推理服务

阿里巴巴云原生

阿里云 Serverless 容器 云原生 Serverless Kubernetes

小灯塔系列-中小企业数字化转型系列研究——项目管理测评报告

向量智库

数仓中典型的几种不下推语句整改案例

华为云开发者联盟

数据库 后端 华为云 华为云开发者联盟 企业号 8 月 PK 榜

Linux系统安装和使用Kafka教程。

百度搜索:蓝易云

云计算 kafka Linux 运维 云服务器

“一日之际在于晨”,欢迎莅临WAVE SUMMIT上午场:Arm 虚拟硬件早餐交流会

飞桨PaddlePaddle

人工智能 paddle 百度飞桨 硬件生态

LED电子显示屏幕如何计算它的面积

Dylan

广告 交通 LED显示屏 全彩LED显示屏 体育

TiDB v7.1.0 跨业务系统多租户解决方案

PingCAP

MySQL 数据库 多租户 TiDB

华为携手华中地区5大高校倡议共建湖北省高性能计算研究院建设

彭飞

Syncovery for mac(文件备份和同步工具) 10.6.12激活版

mac

苹果mac Windows软件 Syncovery 文件同步和备份软件

小灯塔系列-中小企业数字化转型系列研究——任务管理测评报告

向量智库

HyperDock for Mac(mac窗口调整工具)v1.8.0.10中文激活版

mac

苹果mac Windows软件下载 HyperDock 窗口调整工具

开源软件下游分发合规性讨论 ——“心寄源”法律沙龙(2023第四期 | 总第九期)成功召开

开放原子开源基金会

开源

万字干货分享 | 阿里云CIPU技术解析

阿里云弹性计算

Linux 中的 su 和 sudo 命令有什么区别?

百度搜索:蓝易云

云计算 Linux 运维 sudo su

Docker容器安装Nginx教程。

百度搜索:蓝易云

nginx 云计算 Linux 容器 运维

教你如何搭建K8S集群。

百度搜索:蓝易云

云计算 Kubernetes 运维 k8s 集群

制造执行系统(MES)在新能源领域的应用

万界星空科技

新能源 新能源行业

校源行丨开放原子开源基金会赴厦门大学访问交流

开放原子开源基金会

开源 校源行

Chrome 浏览器+Postman做接口测试 ?

霍格沃兹测试开发学社

大模型时代,如何重塑AI人才的培养?知名高校专家为您解答

飞桨PaddlePaddle

人工智能 paddle 百度飞桨

小灯塔系列-中小企业数字化转型系列研究——企业网盘测评报告

向量智库

全链路Trace全量存储-重造索引

乘云 DataBuff

TiDB Bot:用 Generative AI 构建企业专属的用户助手机器人

PingCAP

人工智能 数据库 AI TiDB

聊聊并发——生产者消费者模式_语言 & 开发_方腾飞_InfoQ精选文章