写点什么

蘑菇街千亿级消息 Kafka 上云实践

2021 年 1 月 13 日

蘑菇街千亿级消息Kafka上云实践

Apache Kafka 凭借其高吞吐、高可靠等特性在实时数据或流式数据架构中扮演着重要角色,受到了众多企业用户的青睐。但是随着云时代来临,公有云厂商纷纷推出消息队列服务,很多用户也逐渐从自建消息集群过渡到使用云上消息队列服务。本文将以蘑菇街 Kafka 服务迁移上云为例,阐述腾讯云消息队列 CKafka 如何对用户产生价值。


Apache Kafka  简介


Apache Kafka 官网用这样一句话描述最新版本的 Kafka:A distributed streaming platform。即分布式流式计算平台,并对其做了如下阐述:


Kafka® is used for building real-time data pipelines and streaming apps. It is horizontally scalable, fault-tolerant, wicked fast, and runs in production in thousands of companies.


译文:Kafka®用于构建实时数据管道和流式应用程序。它具有水平可伸缩性、容错性、超快速性,可在数千家公司中投入生产。


升级到 2.0+的 Kafka 给自身加了一层定义,即流计算平台。但是在企业级使用场景下,Kafka 还是被经常当作数据管道来使用,履行消息队列的基本职能。其典型的使用场景如下:  


  • 数据管道和系统解耦。

  • 异步处理和事件驱动。

  • 流量削峰。

  • 事务消息和分布式事务的最终一致性。



自建 Kafka 集群的痛点


由于 Kafka 的搭建方式简单方便,且其性能高效稳定,很多企业用户选择自建 Kafka 集群。但这样看似完美的可行方案背后却有一个隐型风险:当业务的消息数据量到达一定程度后,自建的消息队列集群就会引发各种各样的问题,那么如何解决问题呢?


我们都知道 Kafka 入门简单,进阶却有一定的门槛。解决问题的研发人员需要具备扎实的计算机功底(熟悉计算机网络、IO 等),并且对 Kafka 的底层原理、各种配置参数项等具有深刻理解,可以进行 Kafka 集群参数调优,快速处理突发故障、恢复集群抖动和动态进行集群扩缩容等。正因如此,引发了一些问题:企业一方面需要投入更多的人力、物力成本,另一方面需要时刻监控集群的健康状况,及时排除问题以保障业务的稳定运行。所以自建 Kafka 集群虽然简单,但需要承担日益加重的研发和运维成本。


蘑菇街上云背景


蘑菇街的业务场景和软件架构决定了它对 Kafka 有着强大的依赖,作为电商领域的佼佼者,其消息总量达到了日均千亿条,生产峰值带宽达每秒 GB 级别。其主要的业务场景为分布式大数据处理场景,如广告、交易、安全、离线处理等。


在意识到自建 Kafka 集群的痛点后,为了保证数据的安全性和集群的稳定性,蘑菇街选择使用云上消息队列服务 CKafka。CKafka 不仅支持多可用区容灾,还可以帮助客户实现冷热数据分离,解决频繁读取磁盘 IO 瓶颈,为业务的稳定运行提供良好的保障。接下来我们来分析阐述 CKafka 是如何做到可用区容灾和高性能的集群服务器 IO。


集群跨可用区容灾方案



在 Kafka 消息系统中,客户端感知服务端最核心的操作就是生产和消费。跨可用区容灾的目标是:当一个可用区发生故障(如火灾,断电等)时,能够做到客户端无感知的进行生产和消费,保证业务的稳定运行。而满足可用区容灾需要在技术层面解决如下问题(以上面示意图为例):


  • 分区副本的跨可用区分布,即保证每个分区的副本分布在不同可用区。比如,当集群跨上海二区和四区两个可用区时,分区有四个副本,则需要保证每个可用区都分布两个副本;

  • Kafka 强依赖 Apache Zookeeper,当 Zookeeper 不能正常提供服务时,Kafka 集群也会受到影响,故实现 kafka 的跨区容灾,也要实现 zookeeper 的跨区容灾。Apache Zookeeper 和 Kafka 一样,具有跨区容灾的能力。

  • Broker 节点的 IP 对客户端需要透明化。即客户端不能感知 Broker 的地址。这样当后端 Borker 故障,切换机器 IP 发生改变时,客户端无感知,依然可以正常运行。


解决上述问题需要下面 4 个技术方案。


1. 透明可漂移的 Broker 节点 IP


为什么 Broker 的节点 IP 和端口需要对用户端透明呢?我们先来看如下一段代码:

Properties props = new Properties(); props.put("bootstrap.servers", "192.168.10.10:9092,192.168.10.11:9092,192.168.10.12:9092"); props.put("acks", "all"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new KafkaProducer<>(props); for (int i = 0; i < 100; i++)     producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i))); producer.close();
复制代码


这是一段最简单的 Kafka Produce 代码。192.168.10.10:9092、192.168.10.11:9092、192.168.10.12:9092 是三台真实 Kafka Broker 的 IP 和端口获取 Server 端 Metadata 信息,开始进行生产消息操作。我们来设想一下如下的情况:


当其中一台 192.168.10.10 机器故障无法恢复时,我们重新启动了另外一台 Borker,比如 192.168.10.13:9092 提供服务。此时就需要通知所有客户端,将 Kafka 地址从: "192.168.10.10:9092,192.168.10.11:9092,192.168.10.12:9092" 

修改为 "192.168.10.13:9092,192.168.10.11:9092,192.168.10.12:9092"。


若 IP 配置硬编码到客户端代码中,则需要修改代码,然后打包并发布。由于服务端调整而导致客户端修改配置、重启,这简直是灾难!那要怎么解决这个问题呢?


解决问题的思路就是: Virtual IP Address。如下图所示,我们会在每台提供的 Broker 之前挂载一个四层的 Virtual IP(VIP)和 Virtual Port(VPORT),用户通过访问 VIP 和 VPORT 来访问实际的 Broker 服务。如 10.0.0.1:9092 对应的是真正的 Broker 服务 192.169.10.10.9092。这样就达到了实际 Broker IP 对用户透明化的目的。



那什么是漂移呢?服务需要做到跨可用区容灾。即我们提供的 Virtual IP  Address 能够在可用区之间进行切换的,当该可用区故障,该 VIP 可以迅速切换漂移至另一个可用区,继续提供服务。那么该 VIP 应该是可以访问所有可用区的。如下图,当上海可用区 2 发生故障后,Virtual Ip Service 迅速自动切换到上海可用区 1 可用的 broker 实例,保证客户端的正常使用。



2. 分区副本的跨区分布


原生的 Kafka 按照同一个可用区的副本不能分配在同一台机器上的原则,进行副本随机分配。副本分布逻辑是无感知可用区。即当集群里面哪台 broker 有空闲的空间,就将副本分布在 Broker 上。则有可能将同一个 partiton 的分区分布在同一个分区。


如上面的跨可用区 Virual IP 切换示意图所示,当创建一个 3 个 Replication(副本)的 Partition 时,很有可能该 Partition 的 Replication 都落在了上海可用区 2。如果此时上海可用区 2 发生故障,那么该 Partition 就不能正常提供服务,直接影响业务。怎么解决这个问题呢?


CKafka 会在 broker 上添加可用区标记,当发现客户创建的主题是跨可用区主题时,会将同一个分区的副本分配在多个可用区,保证一个可用区故障时分区仍然有存活的副本。通过修改 Kafka 源码的分区分配逻辑,添加了可用区标记逻辑,根据需求将不同的 Replicatiton 分配到不同的 Broker 上。而这些 Broker 则属于不同的可用区。实现原理如下:


首先来看一下 Zookeeper 上的节点/broker/topics/test-topic 的内容,内容如下:


{"version":1,"partitions":{"0":[10840,10839],"1":[10838,10840],"2":[10839,10838]}}
复制代码


这段内容意思是:test-topic 这个主题有 0、1、2 三个分区,0 分区分布在 broker[10840,10839]上,1 分区分布在 broker[10838,10840],依次类推。所以,只需要修改该内容的生成逻辑就可以控制 Partiton 的分布,即可实现该逻辑。


3. Zookeeper 的跨区部署


被 Kafka 强依赖的 Zookeeper 组件,它也需要跨区部署保证其可用性。首先来看一下 Zookeeper 的选举策略:半数以上的节点都同意后才能当选 leader,如果是偶数节点可能导致票数相同的情况,会使 leader 选取失败,最终导致集群失效。另外当 Zookeeper 集群故障节点数超过半数时,Zookeeper 集群将无法正常工作。


由 Zookeeper 分布式一致性算法的特点,可以得出一个结论:假如每个 zone 部署一个 zk 节点,zk 要支持 n 区容灾(同时挂掉 n 个区的 zk 节点),需要部署 2n+1 个分区才能保证 Zookeeper 的分区可用。即在 n=1 的情况下,需要部署 3 个可用区,才能保证 zookeeper 集群的单可用区可用。


4. Broker 配置优化


根据设计方案,在不同的可用区部署 Broker 时,需要调整一些参数。这些参数保证了服务跨区容灾的最大可用性。需要修改如下三个配置:


unclean.leader.election.enable=truemin.insync.replicas=1offsets.topic.replication.factor=3
复制代码


这三个配置什么意思呢? 依次来看一下:


  • unclean.leader.election.enable


官方描述:Indicates whether to enable replicas not in the ISR set to be selected as leader as a last resort, even though doing so may result in data loss。


解释:该字段默认值为 False。默认情况下 leader 不能从非 ISR 的副本列表里选择;因为在非 ISR 副本列表里选择 leader,很有可能会导致部分数据丢失。既然这样,那为什么还要打开这个字段呢?因为在很异常情况下,比如 ISR 内的副本都不可用了,此时如果该字段设置为 False,服务会直接挂掉;如果该字段设为 True,即允许从非 ISR 列表中选择 leader,那么服务尽管有可能丢失数据,却依然可以继续使用。所以这个参数必须参考业务特性来决定是否打开。


  • min.insync.replicas


官方描述:When a producer sets acks to "all" (or "-1"), min.insync.replicas specifies the minimum number of replicas that must acknowledge a write for the write to be considered successful。


解释:该字段默认值为 1。上述英文翻译为:表示当在 acks=-1 时,最少有一个 Replica 进行确认回执,才确认数据写入成功。这个参数在集群搭建时,为了保证数据的完整性,经常会被改为 2。这里改为 1 的原因是:在只有一个副本在工作 、其他都挂掉的极端情况下,保证客户端能够正常提供服务。如果设置为 2,当只有一个副本在工作的时候,就会出现生产端一直生产失败的情况,会影响业务。


  • offsets.topic.replication.factor


官网描述:The replication factor for the offsets topic (set higher to ensure availability). Internal topic creation will fail until the cluster size meets this replication factor requirement.


解释:该值默认为 1。表示 kafka 的内部 topic consumer_offsets 副本数。当该副本所在的 broker 宕机,consumer_offsets 只有一份副本,该分区宕机。使用该分区存储消费分组 offset 位置的消费者均会收到影响,offset 无法提交,从而导致生产者可以发送消息但消费者不可用。所以需要设置该字段的值大于 1。


集群 IO 压力优化方案


自建消息集群的用户常常会遇到一个问题:在流量峰值时,集群 IO 压力很大,用户只能通过扩容来暂时解决问题。但这毕竟是权宜之计,为了帮助用户真正解决该问题,腾讯云 CKafka 团队对客户服务器端的各项指标及业务场景进行了深入分析。我们发现集群的 IO 压力占比最大的是磁盘读压力。但是为什么磁盘读压力大呢?我们首先来看一下 Kafka 底层的磁盘存储设计原理。


1. Kafka 磁盘存储设计原理


Kafka 的磁盘存储设计可以用三个词来概括:磁盘顺序读写、Page Cache 和零拷贝。


  • 磁盘顺序读写:即 Kafka 数据的写入和读取是顺序的。而根据局部性原理,在实际测试中,磁盘顺序写入和随机写入的性能比相差最大可达 6000 倍。

  • Page Cache:它是 Kafka 能够实现顺序读写的关键技术。另外,它也是操作系统主要实现的一种磁盘缓存,用来减少磁盘的 I/O 操作。具体做法是把磁盘中的数据缓存到内存中,把对磁盘的访问变为对内存的访问。Page Cache 中的数据会按照一定的策略更新到磁盘。

  • 零拷贝:将数据直接从磁盘文件复制到网卡设备中,而不需要经由应用程序之手。这样做大大提高了应用程序的性能,减少了内核和用户模式之间的上下文切换。对 Linux 操作系统而言,零拷贝技术依赖于底层的 sendfile() 方法实现。对于 Java 语言,FileChannal.transferTo() 方法的底层实现也是 sendfile() 方法。


2. 为什么服务器读压力大?



从上面的存储原理图来分析:理论上集群的读压力不应该这么大,因为大部分的读压力应该命中 Page Cache,不应该再从磁盘里面读取。然而实际情况中确实存在大量的磁盘读取行为。经过分析,客户存在多个业务消费同一份消息的业务场景,根据消费的实时性可以将消息消费者行为划分两类:实时消费者和离线消费者。


  • 实时消费者:对数据实时性要求较高,需要采用实时消费消息的方式。在实时消费的场景下,Kafka 会利用系统的 page cache 缓存,生产消息到 broker,然后直接从内存转发给实时消费者,磁盘压力为零。通常称上述操作为热读,常见的业务场景有广告、推荐等。

  • 离线消费者:又名定时周期性消费者,消费的消息通常是数分钟前或是数小时前的消息。而这类消息通常存储在磁盘中,消费时会触发磁盘的 IO 操作。通常称其为冷读,适合报表计算、批量计算等周期性执行的业务场景。


在消息量非常大的情况下,实时和离线消费者同时消费一个集群,会导致两个问题:


  • 实时消费者受到离线消费者影响:由于离线消费者消费,导致落盘数据和实时数据会频繁的换入换出内存,直接影响实时业务的实时性,增加实时业务的响应时延;

  • 离线数据会导致繁重的磁盘 IO 操作:当离线任务读取的数据量非常大时,会触发磁盘的高 IO,磁盘的 IO util 甚至达到 100%,影响集群的稳定性。


3. 优化之道:冷热数据分离方案


针对用户集群中存在的数据冷读和热读并存问题,我们认为将集群的数据进行冷热数据分离是当前较优的解决方案。而在不改变生产端行为的情况下,怎么对冷热数据进行分离呢?腾讯云 CKafka 推出了基于开源 Kafka Connector 的数据同步服务来解决上述问题。架构图如下图所示:



broker 集群被拆分为实时集群和离线集群。两个集群分别负责同时引导离线业务消费离线集群。CKafka 在两个集群中间添加了 connector 集群。connector 集群将离线业务订阅的消息(按照主题维度同步)从实时集群同步到离线集群中,connector 集群实时进行数据同步,和实时消费者保持一致。这样操作不仅对磁盘 IO 没有影响,也不会对其他的实时消费者造成影响。


CKafka 对业务的价值


CKafka 提供高吞吐性能、高可扩展性的消息队列服务。在性能、扩展性、业务安全保障、运维等方面具有超强优势,让用户在享受低成本、超强功能的同时,免除繁琐运维工作。



头图:Unsplash

作者:张晓宇, 许文强

原文:https://mp.weixin.qq.com/s/89zOy63MjDfyJnLhY8CkUA

原文:蘑菇街千亿级消息 Kafka 上云实践

来源:腾讯云中间件 - 微信公众号 [ID:gh_6ea1bc2dd5fd]

转载:著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

2021 年 1 月 13 日 22:10994

评论

发布
暂无评论
发现更多内容

LeetCode题解:169. 多数元素,分治,JavaScript,详细注释

Lee Chen

算法 LeetCode 前端进阶训练营

快三走势分析判断技巧《走势图预测》

陈北

SELinux 安全上下文配置

moon不讲武德!!!一个类加载机制给面试官说蒙了!!

moon聊技术

Java JVM 类加载 类加载器

About Me

翎君

android

【JAVA】List转换为array

莫问

在线K歌的发展和优势

anyRTC开发者

音视频 WebRTC RTC sdk

如何基于App SDK快速地开发一个IoT App?

IoT云工坊

App 物联网 sdk 智能家居

马士兵最新2020涵盖P5—P8Java全栈架构师学习路线,跟着老师学我已拿P7Offer!

Java架构追梦

Java 学习 架构 面试 马士兵

【薪火计划】05 - 坦诚是领导力的根基

brave heart

管理

英特尔与南京溧水经济技术开发区共同成立智能交通研究院

intel001

程序员面试的时候突然遇到答不上的问题怎么办?

Java架构师迁哥

市值管理软件,交易平台挂单机器人,做市机器人

WX13823153201

市值管理软件

基于DAYU的实时作业开发,分分钟搭建企业个性化推荐平台

华为云开发者社区

华为 算法 数据 dayu

什么是堡垒机?为什么需要堡垒机?

xcbeyond

运维

申通快递 双11 云原生应用实践

阿里巴巴云原生

阿里云 Kubernetes 运维 云原生 监控

我是面试官,我来分享一波面经!看看我的内心OS

比伯

Java 编程 架构 面试 技术宅

视频作品播放量低:自媒体作者如何走出新手村

石头IT视角

如何用CSS实现图像替换链接文本显示并保证链接可点击

陈北

CSS小技巧

有奖话题 | 如果程序员和产品经理都会凡尔赛文学,将如何对话?

YourBatman

话题讨论 凡尔赛文学

快三高手预测《走势技巧易懂教学》

陈北

jdk

市值管理机器人、自动跑k线机器人开发

t13823115967

市值管理机器人 自动跑k线机器人开发

云原生应用Go语言:你还在考虑的时候,别人已经应用实践

华为云开发者社区

go 微服务 云技术

Python进阶——什么是元类?

Kaito

Python

30分钟开发一款抓取网站图片资源的浏览器插件

徐小夕

Java chrome 前端 前端进阶 chrome扩展

双指针算法和位运算&离散化和区间合并

落曦

快三大小最简单的方法倍投盈利

陈北

geohash

区块链应用场景有哪些?区块链应用开发

t13823115967

区块链应用场景有哪些 区块链应用开发

面试JVM一问三不知??来看看这个

程序员的时光

JVM Java虚拟机

大厂经验:埋点数据质量之埋点验证

阿亮

埋点 数据验证

Web前端如何实现断点续传

QiyihaoLabs

Web 断点续传 upload pl

彻底搞懂 IO 底层原理

vivo互联网技术

Java Netty 服务器 语法

蘑菇街千亿级消息Kafka上云实践-InfoQ