【QCon】精华内容上线85%,全面覆盖“人工智能+”的典型案例!>>> 了解详情
写点什么

去哪儿网消息队列设计与实现

  • 2018-12-07
  • 本文字数:5027 字

    阅读完需:约 16 分钟

去哪儿网消息队列设计与实现

去哪儿网近日在GitHub上开源了其内部广泛使用的消息队列(内部代号 QMQ),本文从去哪儿网使用消息队列所碰到的各种问题出发探讨去哪儿网消息队列的设计与实现。

背景

2012 年,随着公司业务的快速增长,公司当时的单体应用架构很难满足业务快速增长的要求,和其他很多公司一样,去哪儿网也开始了服务化改造,按照业务等要素将原来庞大的单体应用拆分成不同的服务。那么在进行服务化改造之前首先就是面临是服务化基础设施的技术选型,其中最重要的就是服务之间的通信中间件。一般来讲服务之间的通信可以分为同步方式和异步方式。同步的方式的代表就是 RPC,我们选择了当时还在活跃开发的 Alibaba Dubbo(在之后 Dubbo 官方停止了开发,但是最近 Dubbo 项目又重新启动了)。


异步方式的代表就是消息队列(Message Queue),MQ 在当时也有很多开源的选择:RabbitMQ, ActiveMQ, Kafka, MetaQ(RocketMQ 的前身)。首先因为技术栈我们排除了 erlang 开发的 RabbitMQ,而 Kafka 以及 Java 版 Kafka 的 MetaQ 在当时还并不成熟和稳定。而 ActiveMQ 在去哪儿网已经有很多应用在使用了,但是使用过程中并不一帆风顺:宕机,消息丢失,消息堵塞等问题屡见不鲜,而且 ActiveMQ 发展多年,代码也非常复杂,想要完全把控也不容易,所以我们决定自己造一个轮子。

问题

如果我们要在服务化拆分中使用消息队列,那么我们需要解决哪些问题呢?首先去哪儿网提供了旅游产品在线预订服务,那么就涉及电商交易,在电商交易中我们认为数据的一致性是非常关键的要素。那么我们的 MQ 必须提供一致性保证。


MQ 提供一致性保证又分为两个方面:发消息时我们如何确保业务操作和发消息是一致的,也就是不能出现业务操作成功消息未发出或者消息发出了但是业务并没有成功的情况。举例来说,支付服务使用消息通知出票服务,那么不能出现支付成功,但是消息没有发出,这会引起用户投诉;但是也不能出现支付未成功,但是消息发出最后出票了,这会导致公司损失。总结一下就是发消息和业务需要有事务保证。一致性的另一端是消费者,比如消费者临时出错或网络故障,我们如何确保消息最终被处理了。那么我们通过消费 ACK 和重试来达到最终一致性。


而服务端设计,在当时我们考虑的并不多,我们原计划只在交易环节中使用自己开发的 MQ,经过对未来数据的预估我们选择数据库作为 MQ Server 的消息存储,但是随着 MQ 在各系统中的大量应用,就不仅限于交易场景了,而且大家都期望所有场景中只使用一套 API,所以后来消息量迅速增长,迫使我们对存储模型进行了重新设计。再加上旅游产品预定的特征,大部分预定都是未来某个时间点的,这个时间可长可短,短的话可能是几个小时,长的话可能是半年以上,那么我们对延时消息的需求也很强烈。那么这种延时时间不固定的方式也对服务端设计提出了挑战。


接下来本文会从客户端一致性设计和服务端存储模型两方面进行讨论。

客户端一致性

提到一致性,大家肯定就想到事务,而一提到事务,肯定就想到关系型数据库,那么我们是不是可以借助关系型 DB 里久经考验的事务来实现这个一致性呢。我们以 MySQL 为例,对于 MySQL 中同一个实例里面的 db,如果共享相同的 Connection 的话是可以在同一个事务里的。以下图为例,我们有一个 MySQL 实例监听在 3306 端口上,然后该实例上有 A,B 两个 DB,那么下面的伪代码是可以跑在同一个事务里的:



begin transactioninsert into A.tbl1(name, age) values('admin', 18);insert into B.tbl2(num) values(20);end transaction
复制代码


有了这层保证,我们就可以透明的实现业务操作和消息发送在同一个事务里了,首先我们在公司所有 MySQL 实例里初始化出一个 message db,这个可以放到自动化流程中,对应用透明。然后我们只要将发消息与业务操作放到同一个 DB 事务里即可。


我们来看一个实际的场景:在支付场景中,支付成功后我们需要插入一条支付流水,并且发送一条支付完成的消息通知其他系统。那么这里插入支付流水和发送消息就需要是一致的,任何一步没有成功最后都会导致问题。那么就有下面的代码:


@Transactionalpublic void pay(Order order){    PayTransaction t = buildPayTransaction(order);    payDao.append(t);    producer.sendMessage(buildMessage(t));}
复制代码


上面的代码可以用下面的伪代码解释:


@Transactionalpublic void pay(Order order){    PayTransaction t = buildPayTransaction(order);    payDao.append(t);    //producer.sendMessage(buildMessage(t));    final Message message = buildMessage(t);    messageDao.insert(message);    //在事务提交后执行    triggerAfterTransactionCommit(()->{        messageClient.send(message);        messageDao.delete(message);    });}
复制代码


实际上在 producer.sendMessage 执行的时候,消息并没有通过网络发送出去,而仅仅是往业务 DB 同一个实例上的消息库插入了一条记录,然后注册事务的回调,在这个事务真正提交后消息才从网络发送出去,这个时候如果发送到 server 成功的话消息会被立即删除掉。而如果消息发送失败则消息就留在消息库里,这个时候我们会有一个补偿任务会将这些消息从消息库里捞出然后重新发送,直到发送成功。整个流程就如下图所示:



  1. begin tx 开启本地事务

  2. do work 执行业务操作

  3. insert message 向同实例消息库插入消息

  4. end tx 事务提交

  5. send message 网络向 server 发送消息

  6. reponse server 回应消息

  7. delete message 如果 server 回复成功则删除消息

  8. scan messages 补偿任务扫描未发送消息

  9. send message 补偿任务补偿消息

  10. delete messages 补偿任务删除补偿成功的消息

服务端存储模型

分析了客户端为了一致性所作的设计后,我们再来看看服务端的存储设计。我会从两个方面来介绍:类似 Kafka 之类的基于 partition 存储模型有什么问题,以及我们是如何解决的。

基于 partition 存储模型的问题

我们都知道 Kafka 和 RocketMQ 都是基于 partition 的存储模型,也就是每个 subject 分为一个或多个 partition,而 Server 收到消息后将其分发到某个 partition 上,而 Consumer 消费的时候是与 partition 对应的。比如,我们某个 subject a 分配了 3 个 partition(p1, p2, p3),有 3 个消费者(c1, c2, c3)消费该消息,则会建立 c1 - p1, c2 - p2, c3 - p3 这样的消费关系。



那么如果我们的 consumer 个数比 partition 个数多呢?则有的 consumer 会是空闲的。



而如果 partition 个数比 consumer 个数多呢?则可能存在有的 consumer 消费的 partition 个数会比其他的 consumer 多的情况。



那么合理的分配策略只有是 partition 个数与 consumer 个数成倍数关系。


以上都是基于 partition 的 MQ 所带来的负载均衡问题。因为这种静态的绑定的关系,还会导致 Consumer 扩容缩容麻烦。也就是使用 Kafka 或者 RocketMQ 这种基于 partition 的消息队列时,如果遇到处理速度跟不上时,光简单的增加 Consumer 并不能马上提高处理能力,需要对应的增加 partition 个数,而特别在 Kafka 里 partition 是一个比较重的资源,增加太多 parition 还需要考虑整个集群的处理能力;当高峰期过了之后,如果想缩容 Consumer 也比较麻烦,因为 partition 只能增加,不能减少。


跟扩容相关的另外一个问题是,已经堆积的消息是不能快速消费的。比如开始的时候我们分配了 2 个 partition,由 2 个 Consumer 来消费,但是突然发送方大量发送消息(这个在日常运维中经常遇到),导致消息快速的堆积,这个时候我们如何能快速扩容消费这些消息呢?其实增加 partition 和 Consumer 都是没有用的,增加的 Consumer 爱莫能助,因为堆积的那 3 个 partition 只能由 2 个 Consumer 来消费,这个时候你只能纵向扩展,而不能横向扩展,而我们都知道纵向扩展很多时候是不现实的,或者执行比较重的再均衡操作。


去哪儿消息队列存储模型

上面已经介绍了基于 partition 的存储模型存在的问题,那么这些问题对于我们是问题吗?或者我们的场景是否能克服这些问题呢?


现在去哪儿网的系统架构基本上都是消息驱动的,也就是绝大多数业务流程都是靠消息来驱动,那么这样的架构有什么特征呢:


  • 消息主题特别多 现在生产上已有 4W+消息主题。这是业务中使用的消息与数据流处理中使用的最大的不同,数据流中一般消息主题少,但是每个消息主题的吞吐量都极大。而业务中的消息是主题极多,但是有很多主题他的量是极小的。

  • 消息消费的扇出大 也就是一个消息主题有几十个甚至上百个不同的应用订阅是非常常见的。以去哪儿酒店订单状态变更的消息为例,目前有将近 70 多个不同的系统来订阅这个消息。


结合前面对基于 partition 的存储模型的讨论,我们觉得这种存储模型不太容易符合我们的需求。


虽然我们并不想采用基于 partition 的存储模型,但是 Kafka 和 RocketMQ 里很多设计我们还是可以学习的:


  • 顺序 append 文件,提供很好的性能

  • 顺序消费文件,使用 offset 表示消费进度,不用给每个消息记录消费状态,成本极低

  • 将所有 subject 的消息合并在一起,减少 parition 数量,单一集群可以支撑更多的 subject(RocketMQ)


在设计 QMQ 的存储模型时,觉得这几点是非常重要的。那如何在不使用基于 partition 的情况下,又能得到这些特性呢?正所谓有前辈大师说:计算机中所有问题都可以通过添加一个中间层来解决,一个中间层解决不了那就添加两个。


我们通过添加一层拉取的 log(pull log)来动态映射 consumer 与 partition 的逻辑关系,这样不仅解决了 consumer 的动态扩容缩容问题,还可以继续使用一个 offset 表示消费进度。而 pull log 与 consumer 一一对应。


下图是 QMQ 的存储模型



先解释一下上图中的数字的意义。上图中方框上方的数字,表示该方框在自己 log 中的偏移,而方框内的数字是该项的内容。比如 message log 方框上方的数字:3,6,9 表示这几条消息在 message log 中的偏移。而 consume log 中方框内的数字 3,6,9,20 正对应着 messzge log 的偏移,表示这几个位置上的消息都是 topic1 的消息,consume log 方框上方的 1,2,3,4 表示这几个方框在 consume log 中的逻辑偏移。下面的 pull log 方框内的内容对应着 consume log 的逻辑偏移,而 pull log 方框外的数字表示 pull log 的逻辑偏移。


这样存储中有三种重要的 log:


  • message log 所有 subject 的消息进入该 log,消息的主存储

  • consume log consume log 存储的是 message log 的索引信息

  • pull log 每个 consumer 拉取消息的时候会产生 pull log,pull log 记录的是拉取的消息在 consume log 中的 sequence


那么消费者就可以使用 pull log 上的 sequence 来表示消费进度,这样一来我们就解耦了 consumer 与 partition 之间的耦合关系,两者可以任意的扩展。

延迟消息队列存储模型

除了对实时消息的支持,QMQ 还支持了任意时间的延时消息,在开源版本的 RocektMQ 里提供了多种固定延迟 level 的延时消息支持,也就是你可以发送几个固定的延时时间的延时消息,比如延时 10s, 30s…,但是基于我们现有的业务特征,我们觉得这种不同延时 level 的延时消息并不能满足我们的需要,我们更多的是需要任意时间延时。在 OTA 场景中,客人经常是预订未来某个时刻的酒店或者机票,这个时间是不固定的,我们无法使用几个固定的延时 level 来实现这个场景。


我们的延时/定时消息使用的是两层 hash wheel timer 来实现的。第一层位于磁盘上,每个小时为一个刻度,每个刻度会生成一个日志文件,根据业务特征,我们觉得支持两年内任意时间延时就够了,那么最多会生成 2 * 366 * 24 = 17568 个文件。第二层在内存中,当消息的投递时间即将到来的时候,会将这个小时的消息索引(偏移量,投递时间等)从磁盘文件加载到内存中的 hash wheel timer 上。



在延时/定时消息里也存在三种 log:


  • message log 和实时消息里的 message log 类似,收到消息后 append 到该 log,append 成功后就立即返回

  • schedule log 按照投递时间组织,每个小时一个。该 log 是回放 message log 后根据延时时间放置对应的 log 上,这是上面描述的两层 hash wheel timer 的第一层,位于磁盘上

  • dispatch log 延时/定时消息投递后写入,主要用于在应用重启后能够确定哪些消息已经投递

总结

消息队列是构建微服务架构很关键的基础设施,本文根据我们自己的实际使用场景,并且对目前市面上一些活跃的开源产品进行对比,提出去哪儿网的消息队列的设计与实现。本文只是从宏观架构上做出一些探讨,具体的实现细节也有很多有趣的地方,目前去哪儿网的消息队列 QMQ 也已经在GitHub上开源,欢迎大家试用,欢迎 PR,谢谢。


2018-12-07 15:5039098

评论 23 条评论

发布
用户头像
ActiveMQ:「宕机,消息丢失,消息堵塞等问题屡见不鲜,而且 ActiveMQ 发展多年,代码也非常复杂」。确实
2018-12-16 17:25
回复
用户头像
受益匪浅,一致很困扰业务数据入库和发送消息之间的不一致问题。
2018-12-12 16:59
回复
用户头像
Qmq 增多了日志文件的磁盘写入次数,性能上应该会有影响吧,存储成本也要增加,另外日志支持多副本不?
2018-12-10 20:27
回复
稍微有些影响,pull log是非常小的,一个消息就是一个long,8个字节,另外pull log是不需要同步刷盘的,所以影响很小。采用的是主从复制的方式
2018-12-10 21:04
回复
用户头像
可以可以,活用手上的资源,解决业务问题。
2018-12-10 16:29
回复
用户头像
赞,后面研究下
2018-12-09 10:52
回复
用户头像
Qmq的确很好用啊,赞
2018-12-08 11:39
回复
用户头像
🐂
2018-12-08 09:55
回复
用户头像
这个pull log 感觉很傻比。。。。partition 难道不是为了解决磁盘读写速度的问题?
2018-12-08 09:28
回复
pull log为啥傻比呢..
partition又是如何解决磁盘读写速度的?
2018-12-08 11:13
回复
partition应该是来解决并发能力吧,顺序存储解决磁盘读写,不过现在kafka有了consumer group,感觉比这个好用些现在
2018-12-10 10:47
回复
感觉你都没看文章啊
2018-12-10 11:25
回复
查看更多回复
用户头像
6666
2018-12-08 08:28
回复
用户头像
得益于开源,回馈社区,Q在开源的路上稳步前行
2018-12-08 08:06
回复
用户头像
赞赞赞,厉害
2018-12-08 00:18
回复
用户头像
2018-12-07 23:18
回复
用户头像
厉害了,领教
2018-12-07 23:12
回复
用户头像
厉害了,领教
2018-12-07 23:12
回复
用户头像
延时发消息的功能可以代替部分定时job功能,很好用
2018-12-07 20:29
回复
用户头像
12年就有这种思想和设计,还是很强的。另外activemq确实太坑了
2018-12-07 19:05
回复
没有更多了
发现更多内容

遗留代码处理技巧与案例演示

京东科技开发者

数据结构 重构 代码重构 遗留代码 耦合

Git本地提交代码推送远程并未统计贡献量问题分析

Andy

云BI,如何成为了企业的“贴身管家”?

夏日星河

线上直播 | 未来金融研究所——以应用为中心,重塑金融研发效率

CODING DevOps

云原生 金融

大学生想进大厂是通过自学还是java培训

小谷哥

如何实现对象存储?

MatrixOrigin

数据库 分布式数据库 对象存储 MatrixOrigin MatrixOne

链表只有面试有用?Redis 之父说:我不同意!

图灵教育

算法 链表 Redis 数据结构

react-Suspense工作原理分析

夏天的味道123

React

React高级特性之Context

夏天的味道123

React

「Go工具箱」web中想做到cookie值安全?securecookie库的使用和实现原理

Go学堂

golang 开源 程序员 Cookie WEB安全

融云「百幄」之视频会议和直播,让办公桌无限延伸

融云 RongCloud

直播 视频会议 通讯

LED显示屏设计和安装比例有什么联系

Dylan

LED显示屏 户外LED显示屏 led显示屏厂家

火山引擎钜惠双11开启,云服务器0.71折起

Geek_2d6073

CRAFTS:端对端的场景文本检测器

合合技术团队

人工智能 深度学习 文字识别 端口 文本检测

云资源管理平台有哪些?重点推荐哪家?

行云管家

云计算 云服务 云资源 云管理

Nydus | 容器镜像基础

SOFAStack

Nydus

工业互联网新引擎——灵雀云 × 英特尔 5G融合边缘云解决方案

York

云原生 5G 边缘计算 架构设计 云边端协同

MindStudio模型训练场景精度比对全流程和结果分析

华为云开发者联盟

人工智能 华为云 企业号十月 PK 榜

线上kafka消息堆积,consumer掉线,怎么办?

Java永远的神

Java kafka 程序员 程序人生 消息中间件

React组件通信

xiaofeng

React

CSS 如何实现五彩斑斓的“呼吸字”?速度拿去装杯!

掘金安东尼

CSS 11月月更

女生参加前端培训,学习不如男生吗?

小谷哥

湘潭等级测评机构有哪些?排名是怎样?

行云管家

等保 等级保护 等保测评 等保测评机构

React高级特性之Render Props

夏天的味道123

React

大咖圆桌|研发想要降本增效?来听听专家们的前沿洞见

万事ONES

react组件深度解读

xiaofeng

React

开发培训学习后工作好找吗?

小谷哥

深圳哪所前端培训机构比较靠谱

小谷哥

react进阶用法完全指南

xiaofeng

React

大数据培训学习合适吗?

小谷哥

豆瓣评分8.0!深入理解Java虚拟机,把GC算法与实现讲得明明白白!

Java永远的神

程序员 面试 JVM GC Java虚拟机

去哪儿网消息队列设计与实现_架构_余昭辉_InfoQ精选文章