Uber 的 Kafka 实践:踩坑 5 年,随时像替换汽车引擎一样替换 Kafka

发布于:2020 年 4 月 17 日 18:00

Uber的Kafka实践:踩坑5年,随时像替换汽车引擎一样替换Kafka

Uber 从 2013 年开始使用 Kafka。Uber 的主流应用,如打车、外卖等服务,都需要实时处理数据,所有核心的 Business 都是通过 Kafka 进行消息的传递。这决定了 Kafka 在 Uber 的技术平台中占据非常核心的定位。经过 7 年的发展,Uber 的 Kafka 集群已经发展成为了全球数一数二的规模,每天处理 PB 级别的数据、Trillion 级别的消息。

从 2013 年到 2018 年,Uber 主要是踩坑,修复各种 Bug。到现阶段,整个消息平台已经相当复杂,三分之二的代码是自研,开源 Kafka 仅作为平台核心部分。但这个消息平台也不会被 Kafka 所绑定,当整个系统自动化、标准化工作完成后,又可以用潜在的其他更好的开源软件,像替换汽车引擎一样将这个核心替换掉。

近日 InfoQ 记者采访了 Uber Staff Engineer 富羽鹏,了解 Uber 消息平台实践过程,他将在 QCon 全球软件开发大会(北京站)2020 分享主题为《 Uber 大规模实时数据平台架构演进与实践》的演讲。

InfoQ:您认为应用开源Kafka 时,它本身有哪些问题是普通使用者必须要解决的?Uber 的 Kafka 有何特殊性?

富羽鹏:Uber 部署的 Kafka 集群规模,是全球数一数二的,每天处理 PB 级别的数据、Trillion 级别的消息。对于一般的小企业,一两个集群就够了。在 Uber,Kafka 的 Topic 数量非常多,生产者和消费者的量级也非常大,还有很多精细化的不同场景的 Use Case,我们会针对不同的场景部署集群,并对容灾性有比较多的考虑。

如果说只从代码量来看,开源 Kafka 只是我们在整个系统里面的最核心(Core)的一部分。从生态系统周边,我们层层把它(Core)包围起来,几乎有 2/3 是 Uber 自己开发的,来解决各种具体的问题。

所以对于中小企业来说,如果没有 Uber 这么大规模和复杂性的话,建议直接使用付费的解决方案。Kafka 开源几年了,已经是一个比较成熟的系统,但是到真正生产环境里,要很好的正确的使用和运维这套系统的话,需要针对不同场景,解决很多的设置和参数调优问题,出了故障还要知道如何快速排查与修复。需要使用者把这套系统吃透,这需要很多年的积累和沉淀。

也就是说对于一个新的企业来说,如果想很快的能把这个开源 Kafka 使用进去的话,有很多坑要踩,有很多路要走。

但对 Uber 来说,解决的问题已经不是使用开源软件踩坑的问题了。Uber 这种级别规模的部署,导致我们会遇到很多其他公司可能遇到不了的一些问题。在高吞吐量、高并发性出现的时候,会触发很多在普通的流量情况下遇不到的问题。我们 Kafka 的架构演进,都是因为目前的规模量带来的,需要我们不断的对系统进行迭代。

InfoQ:在 Uber 里,是否有实时备份机制 ISR 机制吗?一份 Topic 整体算起来会有多少份备份?

富羽鹏:Uber 在备份的问题上考虑的非常多。

备份并不是越多越好。备份越多,管理起来就越麻烦。另外一方面,备份的成本也非常昂贵。Uber 作为一个上市公司,成本是我们非常关注的一点。我们需要保证每一个副本,都是从容灾意义上去考虑的。在 Uber 内部,比如其他团队或组织里 Kafka 的 Service 用户,都需要了解他们到底使用了多少副本,重要的集群用四副本,不重要的、允许数据丢失的使用三副本。

与此同时,跨数据中心的拷贝,会略微复杂一些。每个 Service 需要在哪些 Region 或者 Data Center 进行拷贝,需要跟我们的用户进行协商。我们可以提供 SLA,把数据链发到某个 Cluster 上,我们需要保证在一定时间内,将它复制到指定的数据中心。如果是跨数据中心读取的话,其实这并不是一个非常经济的选择,因为这包括对网络带宽的使用,更长的延时。

对于实时和离线,处理会有很大的区别。对于离线数据会永久性的来保存。但实时数据本身是个流数据,它更像是一个 Buffer,是数据的一个缓存。我们对数据有一定的保存的时间的,比如说大部分的集群的话,我们只把数据保留三天,三天之后我们会把数据删除。大部分下游的消费者,他们更关注的是实时数据的消费。也就是说绝大多数的 Service 来讲,他们会在一天之内读走数据。我们留“三天”,更多的是为我们下游的消费者进行容灾。如果需要有更久的时间,比如说超过三天,我们会从 Data Lake 里面,再把这个数据再给读出来,再放到 Kafka  Topic 里面,再次消费。所以从副本使用的角度来讲,更关键的看 Data Lake 那一边,他们会将数据保留多久。

在一个 Topic 四个副本的机制下,最少也得有两个 Cluster,所以最少也会有八副本存在。再加多数据中心,副本数量就会加倍。Uber 每天产生 PB 级的数据,因此副本也至少有 PB 级。这一点,我们自己也在跟开源社区合作,做 Kafka 的 Remote Storage 功能。传统的 Kafka 都是在使用 Host 本身的硬盘和内存来存储数据。从 Cost 角度来讲,SSD 的价格是非常高的。同时我们发现,绝大多数数据的消费者只需要去读过去六个小时之内的数据。所以我们决定去修改本身的 Broker 代码,它会把数据在设定的时间里直接拷贝到指定的 Storage。可以让 Kafka 本身理解本地数据和 Remote 数据之间的关系,通过使用这样一套机制,把本地的副本量给降下来。

InfoQ:Uber 有没有必须不能丢数据的场景?以及不能容忍乱序的场景?如果有,能具体描述场景和问题是怎么解决的吗?

富羽鹏第一类场景是我们服务的一些最核心的业务。这种场景下,数据的可靠性、系统可靠性非常重要,数据不能有任何丢失。为了满足数据的 Lossless,我们做了很多相关的定制化开发。

从宏观的 Pipeline 角度来看,相当于说是一个 Topic,有上游的生产者 Producer 发数据,同时下游有 Consumer 来消费数据。但是在系统角度上来看,其实这个还有很多的 Stage。比如说它从 Client Service,从 Producer 来讲,它是在一个 client 这边,是在客户端这边开始发数据,它发的第一点,并不是直接发到我们 cluster broker 上面,而是发到我们叫 REST Proxy 这样一个 REST Server 上面。然后从 REST Proxy 的话,它会再发给这样一个 Broker。然后这个 Broker Cluster,用一个数据拷贝的 Pipeline,拷到其他的一个 Cluster。因为这样一个多区域的、多集群的架构。每个地域的生产者它都往本地集群发数据,但是对于消费者来讲,他们有时候可能比较关注全局的数据量。所以,我们会有一个数据拷贝的 Pipeline 来拷到另外一个 Cluster。

针对数据流的不同的 Stage,我们要真正做到 Lossless,就需要对每个 Stage 去考虑数据容灾。比如说机器坏了、数据量丢失,是不是应该重置、Retry,或者找其他的地方做一个 Buffer。基于每一点,我们都做了一些相应的定制化开发。另一个是要做数据的审查。就是怎么知道我们任何的数据丢失。针对 Pipeline 的每一个 Stage,我们都有拿一些数据,或者拿一些 Matrix 来进行比较。保证整个 Pipeline 下没有任何的一个数据丢失。

第二类场景是日志收集。这一类特点是数据量特别大。Uber 有成千上万个 Service,每天产生很多的日志,我们需要把日志通过 Kafka 给聚合起来。对于我们的 Cluster 就有非常高的时效性和高吞吐量支持的要求。对于这一场景我们做了提高吞吐量的优化。

第三类场景是将 Kafka 作为数据库的更改数据捕获 (change data capture, CDC)。特点是数据库对的 transaction 顺序性要求比较高。我们针对数据的有序性、吞吐量的要求也做了一些特别的开发工作。

第四类场景是将 Kafka 作为流处理平台数据的来源。在 Uber 我们有一个开源的流处理平台运行 Flink 的 job,它们从 Kafka 读取数据进行实时计算,计算完成后也可以将数据发回给 Kafka,再传回到下游;我们专门设置了一个集群,跟 Flink 做了深度的一些整合,做了一定的优化。

第五类场景也是一个非常有意思的定制化开发的,Kafka 的协议之上做了 Dead Letter Queue(DLQ),可以允许将个别不能暂时处理的信息放到另外一个 Queue 里面,之后再重新处理这些已经失败的 Message。对于 Kafka 的 Consumer API 来讲,如果有不能处理的 Message 通常就两个选择,要么直接丢弃消息,不处理,继续往前走;要么就不断的重试。

但是 Uber 有一些非常重要的 Topic,Kafka 本身的处理方法是不符合我们的要求的。比如说在处理一些跟 Money 相关的事情的时候,每个 Message 都是非常重要的。如果说 Message 丢失,客户的账就对不上了,这是不行的。但是系统又不能卡在这里,阻止后面的流水进来。

这种情况下我们需要有一个额外缓存的功能,如果当前处理不了,那就先把它分到另外的地方。然后过了一段时间再重新进行处理。我们在 Kafka 这一层之上,做另外的一套 Message Platform 来封装这些额外的功能。

InfoQ:您认为 Kafka 的演进,在 Uber大概分为几个阶段,分别解决的关键问题是什么

富羽鹏:Uber 里 Kafka 的演进,我认为主要分为三个阶段。

第一阶段,最早期的时候,是 2013 年到 2015 年之间,从架构角度来讲,我们主要是在使用 Kafka 开源版本。主要是提高稳定性,因为那个时候 Kafka 本身也比较早期,主要做很多 Bugfix、调优、多语言支持,让 Kafka 更好的匹配到 Uber 的各个不同的 Use Case,让整个架构可以更加稳定的运行起来。那时 Uber 的流量增长也很快,我们要保障在流量起来后,也不至于击垮集群。

另外还有一个有意思的工作是“数据拷贝”。我们对跨数据中心的拷贝做了非常多的优化。在开源的项目里面,Kafka 原生提出了一个 MirrorMaker 的项目,用于集群之间的拷贝。但 Uber 遇到了很多问题,于是开源了自己的一个拷贝项目 uReplicator。主要原因是在消息规模非常大的时候,MirrorMaker 有一些的性能、可靠性的问题,于是我们重新对整个架构进行了大改,最终开发了我们自己这样的一个开源项目。

第二阶段,从 2015 年到 2018 年之间。我们观察到之前的第一阶段,遇到了很多的一些可靠性、扩展性方面的问题,我们更需要去打造一个更加成熟的消息平台。这个时候对 Uber 的业务在爆炸性的增长,几乎就是每六个月 Uber 的 Business 会翻一番,也就意味着我们的数据量每六个月也会翻一番。但是我们的团队的人员并不能翻倍,于是需要考虑自动化方面的开发,来应对数据量和集群数量增长带来的挑战。另外这个时候也做了不少多租户管理方面的定制化工作,比如用户配额、用户黑名单以及重要 Topic 的物理隔离。此外,对于稳定性和容灾机制也做了很多,逐渐形成多地多活、备份集群的架构,另外前面所讲的 DLQ 也是那个时候开发出来的。

第三阶段,是从 2018 年到现在。这个阶段我们希望建立一个标准化、智能化与自动化的消息处理平台。因为我们的应用场景越来越多,数据查询、数据拷贝的需求越来越多,我们的系统越做越复杂。

我们整个生态系统,过去几年上下游有一些用户在开发自己的 Client 或者开源的 Client 和我们的 Kafka 平台进行交互。发展到一定阶段后,我们发现在整个 Uber 内部,林林总总有相当数量的不同语言、不同版本的客户端在跟我们的平台进行交互,让我们管理起来非常的困难。同时这些用户也会让我们的系统开发演进带来额外的挑战。比如说当我们要加一个新的功能或者弃用某些功能时,我们要考虑是不是有某些用户的客户端不能升级,会不会给他们的 Service 带来影响。当这个公司规模发展到一定阶段的时候,标准化就成为了必须要做得一件事情。

其中一个很重要的标准化工作是做 Consumer 端的 Proxy。这项工作目前只有 Uber 在做,其他的公司和开源还没有开始。在开源与 Uber 的 Kafka 架构中,生产者这边都有一个 REST Proxy。但是在消费者这端,因为逻辑会复杂很多,还没有 Proxy 这样的概念,我们在做的工作就是填补这个空白。

这个 Consumer Proxy 不但可以简化很多客户端的 API,同时也可以打破很多 Kafka 对于 Consumer 的一些限制。比如说 Kafka 有个限制是 Consumer 数量不能比 Partition 数量多。但是当我们做了 Consumer Proxy 以后,我们就可以把这个限制给去掉。于是一个八个 Partition 的 Topic 的 message 可以把数据发到成百上千个 Client 上面去。这样能大大降低了一个 Cluster 中 Partition 的的总数,降低 Partition 数量带来的物理资源开销。此外 DLQ 这个功能我们也做到了 Consumer Proxy 的 API 里面去,作为一个原生的功能。

还有一个方面标准化是,在做跨集群跨地域的元数据的整合与管理 Cluster Topology。也就是说把一整套的 Ecosystem、Service 都给整合起来。让所有的 Service 之间更有序,更加多连接起来。这样可以解决我们之前的这样一个有很多很多小 Service 但无序的问题,也同时可以快速的帮我们找到 Topology 中错误与缺失的问题,比如某个 Topic 的数据没有被备份到某个 Cluster 上。

我们的想法是:整套 Ecosystem 最核心的中间部分是一个 Kafka 的 Topic,这个 Topic 跟开源 Kafka 不一样,它可以在多个 Cluster 之间存在,而这套系统理解它的 Topology。这样当用户创建一个 Topic 的时候,可以根据他的需求自动在多个 Cluster 上面把这个 Topic 创建出来,同时在 Cluster 之间进行数据拷贝等。

有了 Cluster Topology 后,我们的系统还会做一些非常智能的事情,比如说当它发现这个 Cluster 有问题的时候,会自动的把 Producer 导到这个其他对应的 Cluster 等等。它可以大大降低管理人员的灾难发现与恢复工作,带来非常好的容灾效果,我们将这个项目叫做 Cluster Federation,是我们之前备份 Cluster 工作的延伸。这个项目的核心思想是认为一个 Topic 可以在多个 Cluster 之间存在。当一个 Cluster 出问题,数据可以自动迁移到另外一个 Cluster。这个系统本身的可以很智能的把存在多个 Cluster 上的 Topic 发送给下游的消费者,也就是说 Cluster 之间互为备份。

最后,我们还需要追求自动化和可伸缩性,来更加的有效的使用机器资源。在这段时间里,我们将所有的集群做“容器化”,这也是做自动化的一个先决条件。这给我们的运维带来了极大的便利,完成机器的添加置换等。我们的运维人数并没有增加,但是我们的机器的数量确实在成倍的增长,这都得益于“自动化”。

总体来说,我们最关注的是高可靠性。就当你数据量增长,或者说是数据规模增长很大的时候,怎么能保证这个系统的可靠性和鲁棒性。其次是可用性,当你这个数据量特别大的时候,如何有效、快速的来对整个平台进行管理,包括对数据本身的管理,包括上游,上下游客户的管理。

我们也在很关注一些新兴的开源软件,比如说像 Pulsar,这个开源软件解决了一些 Kafka 最开始架构上的一些缺陷,比如说存储计算分离。但它还需要被时间检验。但从这个角度来看,对 Uber 来说,通过我们在做的标准化工作,把两端的 Proxy 都给建立好了之后,我们的流处理平台的交互的衔接点,已经不是 Kafka 的协议了。在这个时候,我们可以像换汽车引擎一样,换成任何各种各样其他潜在的开源的解决方案,而不是说被某一个开源的项目绑定。

嘉宾介绍

富羽鹏,Uber Staff Engineer。在 Uber 负责实时数据与分析平台的架构与运营,包括 Kafka 及其周边生态系统。在加入 Uber 之前,是大数据存储平台 Alluxio 的创始成员与 PMC,再之前在 Palantir 从事大数据平台的研发与管理。本科与硕士毕业于清华大学,并在 University of California San Diego 进行了数据库方向的博士研究。

QCon 北京 2020 的分享中,富羽鹏老师将分享 Kafka 及其生态系统在 Uber 的架构演化以及探讨在实践中遇到的经验与教训,点击了解详情。

阅读数:12335 发布于:2020 年 4 月 17 日 18:00

评论

发布
暂无评论