58 同城基于 Flink 的千亿级实时计算平台架构实践

阅读数:2 2020 年 1 月 8 日 14:00

58同城基于Flink的千亿级实时计算平台架构实践

本文由 dbaplus 社群授权转载。

58 同城作为覆盖生活全领域的服务平台,业务覆盖招聘、房产、汽车、金融、二手及本地服务等各个方面。丰富的业务线和庞大的用户数每天产生海量用户数据需要实时化的计算分析,实时计算平台定位于为集团海量数据提供高效、稳定、分布式实时计算的基础服务。本文主要介绍 58 同城基于 Flink 打造的一站式实时计算平台 Wstream。

实时计算场景

和很多互联网公司一样,实时计算在 58 拥有丰富的场景需求,主要包括以下几类:

  • 实时数据 ETL:实时消费 Kafka 数据进行清洗、转换、结构化处理用于下游计算处理。
  • 实时数仓:实时化数据计算,仓库模型加工和存储。实时分析业务及用户各类指标,让运营更加实时化。
  • 实时监控:对系统和用户行为进行实时检测和分析,如业务指标实时监控,运维线上稳定性监控,金融风控等。
  • 实时分析:特征平台,用户画像,实时个性化推荐等。

平台演进

58同城基于Flink的千亿级实时计算平台架构实践

在实时计算平台建设过程中,主要是跟进开源社区发展以及实际业务需求,计算框架经历了 Storm 到 Spark Streaming 到 Flink 的发展,同时建设一站式实时计算平台,旨在提升用户实时计算需求开发上线管理监控效率,优化平台管理。

实时计算引擎前期基于 Storm 和 Spark Streaming 构建, 很多情况下并不能很好的满足业务需求,如商业部门基于 Spark Streaming 构建的特征平台希望将计算延迟由分钟级降低到秒级,提升用户体验,运维监控平台基于 Storm 分析公司全量 nginx 日志对线上业务进行监控,需要秒级甚至毫秒级别的延迟,Storm 的吞吐能力成为瓶颈。

同时随着实时需求不断增加,场景更加丰富,在追求任务高吞吐低延迟的基础上,对计算过程中间状态管理,灵活窗口支持,以及 exactly once 语义保障的诉求越来越多。Apache Flink 开源之后,支持高吞吐低延迟的架构设计以及高可用的稳定性,同时拥有实时计算场景一系列特性以及支持实时 Sql 模型,使我们决定采用 Flink 作为新一代实时计算平台的计算引擎。

平台规模

58同城基于Flink的千亿级实时计算平台架构实践

实时计算平台当前主要基于 Storm/Spark Streaming/Flink,集群共计 500 多台机器,每天处理数据量 6000 亿 +,其中 Flink 经过近一年的建设,任务占比已经达到 50%。

Flink 稳定性

Flink 作为实时计算集群,可用性要求远高于离线计算集群。为保障集群可用性,平台主要采用任务隔离以及高可用集群架构保障稳定性。

任务隔离

在应用层面主要基于业务线以及场景进行机器隔离,队列资源分配管理,避免集群抖动造成全局影响。

58同城基于Flink的千亿级实时计算平台架构实践

集群架构

Flink 集群采用了 ON YARN 模式独立部署,为减少集群维护工作量,底层 HDFS 利用公司统一 HDFS Federation 架构下建立独立的 namespace,减少 Flink 任务在 checkpoint 采用 hdfs/rocksdb 作为状态存储后端场景下由于 hdfs 抖动出现频繁异常失败。

在资源隔离层面,引入 Node Label 机制实现重要任务运行在独立机器,不同计算性质任务运行在合适的机器下,最大化机器资源的利用率。同时在 YARN 资源隔离基础上增加 Cgroup 进行物理 cpu 隔离,减少任务间抢占影响,保障任务运行稳定性。

58同城基于Flink的千亿级实时计算平台架构实践

平台化管理

Wstream 是一套基于 Apache Flink 构建的一站式、高性能实时大数据处理平台。提供 SQL 化流式数据分析能力,大幅降低数据实时分析门槛,支持通过 DDL 实现 source/sink 以及维表,支持 UDF/UDAF/UDTF,为用户提供更强大的数据实时处理能力。支持多样式应用构建方式 FlinkJar/Stream SQL/Flink-Storm,以满足不同用户的开发需求,同时通过调试,监控,诊断,探查结果等辅助手段完善任务生命周期管理。

58同城基于Flink的千亿级实时计算平台架构实践

流式 sql 能力建设

Stream SQL 是平台为了打造 sql 化实时计算能力,减小实时计算开发门槛,基于开源的 Flink,对底层 sql 模块进行扩展实现以下功能:

  • 支持自定义 DDL 语法(包括源表, 输出表, 维表)
  • 支持自定义 UDF/UDTF/UDAF 语法
  • 实现了流与维表的 join, 双流 join

在支持大数据开源组件的同时,也打通了公司主流的实时存储平台。同时为用户提供基于 Sql client 的 cli 方式以及在 Wstream 集成了对实时 sql 能力的支持,为用户提供在线开发调试 sql 任务的编辑器,同时支持代码高亮,智能提示,语法校验及运行时校验,尽可能避免用户提交到集群的任务出现异常。

另外也为用户提供了向导化配置方式,解决用户定义 table 需要了解复杂的参数设置,用户只需关心业务逻辑处理,像开发离线 Hive 一样使用 sql 开发实时任务。

58同城基于Flink的千亿级实时计算平台架构实践

Storm 任务迁移 Flink

在完善 Flink 平台建设的同时,我们也启动 Storm 任务迁移 Flink 计划,旨在提升实时计算平台整体效率,减少机器成本和运维成本。Flink-Storm 作为官方提供 Flink 兼容 Storm 程序为我们实现无缝迁移提供了可行性,但是作为 beta 版本,在实际使用过程中存在很多无法满足现实场景的情况,因此我们进行了大量改进,主要包括实现 Storm 任务 on yarn ,迁移之后任务 at least once 语义保障,兼容 Storm 的 tick tuple 机制等等。

58同城基于Flink的千亿级实时计算平台架构实践

通过对 Fink-Storm 的优化,在无需用户修改代码的基础上,我们已经顺利完成多个 Storm 版本集群任务迁移和集群下线,在保障实时性及吞吐量的基础上可以节约计算资源 40% 以上,同时借助 yarn 统一管理实时计算平台无需维护多套 Storm 集群,整体提升了平台资源利用率,减轻平台运维工作量。

任务诊断

指标监控

Flink webUI 提供了大量的运行时信息供用户了解任务当前运行状况,但是存在无法获取历史 metrics 的问题导致用户无法了解任务历史运行状态,因此我们采用了 Flink 原生支持的 Prometheus 进行实时指标采集和存储。

Prometheus 是一个开源的监控和报警系统,通过 pushgateway 的方式实时上报 metrics,Prometheus 集群采用 Fedration 部署模式,meta 节点定时抓取所有子节点指标进行汇总,方便统一数据源提供给 Grafana 进行可视化以及告警配置。

58同城基于Flink的千亿级实时计算平台架构实践

任务延迟

吞吐能力和延迟作为衡量实时任务性能最重要的指标,我们经常需要通过这两个指标来调整任务并发度和资源配置。Flink Metrics 提供 latencyTrackingInterval 参数启用任务延迟跟踪,打开会显著影响集群和任务性能,官方高度建议只在 debug 下使用。

在实践场景下,Flink 任务数据源基本都是 Kafka,因此我们采用 topic 消费堆积作为衡量任务延迟的指标,监控模块实时通过 Flink rest 获取任务正在消费 topic 的 offset,同时通过 Kafka JMX 获取对应 topic 的 logsize,采用 logsize– offset 作为 topic 的堆积。

58同城基于Flink的千亿级实时计算平台架构实践

日志检索

Flink 作为分布式计算引擎,所有任务会由 YARN 统一调度到任意的计算节点,因此任务的运行日志会分布在不同的机器,用户定位日志困难。我们通过调整 log4j 日志框架默认机制,按天切分任务日志,定期清理过期日志,避免异常任务频繁写满磁盘导致计算节点不可用的情况,同时在所有计算节点部署 agent 实时采集日志,汇聚写入 Kafka,通过日志分发平台实时将数据分发到 ES,方便用户进行日志检索和定位问题。

Flink 优化

在实际使用过程中,我们也针对业务场景进行了一些优化和扩展,主要包括:

1)Storm 任务需要 Storm 引擎提供 ack 机制保障消息传递 at least once 语义,迁移到 Flink 无法使用 ack 机制,我们通过定制 KafakSpout 实现 checkpoint 相关接口,通过 Flink checkpoint 机制实现消息传递不丢失。另外 Flink-Storm 默认只能支持 standalone 的提交方式,我们通过实现 yarn client 相关接口增加了 storm on yarn 的支持。

2)Flink 1.6 推荐的是一个 TaskManager 对应一个 slot 的使用方式,在申请资源的时候根据最大并发度申请对应数量的 TaskManger,这样导致的问题就是在任务设置 task slots 之后需要申请的资源大于实际资源。

我们通过在 ResoureManager 请求资源管理器 SlotManager 的时候增加 TaskManagerSlot 相关信息,用于维护申请到的待分配 TaskManager 和 slot,之后对于 SlotRequests 请求不是直接申请 TaskManager,而是先从 SlotManager 申请是否有足够 slot,没有才会启动新的 TaskManger, 这样就实现了申请资源等于实际消耗资源,避免任务在资源足够的情况下无法启动。

58同城基于Flink的千亿级实时计算平台架构实践

3)Kafak Connector 改造,增加自动换行支持,另外针对 08source 无法设置 client.id,通过将 client.id 生成机制优化成更有标识意义的 id,便于 Kafka 层面管控。

4)Flink 提交任务无法支持第三方依赖 jar 包和配置文件供 TaskManager 使用,我们通过修改 flink 启动脚本,增加相关参数支持外部传输文件,之后在任务启动过程中通过将对应的 jar 包和文件加入 classpath,借助 yarn 的文件管理机制实现类似 spark 对应的使用方式,方便用户使用。

5)业务场景存在大量实时写入 hdfs 需求,Flink 自带 BucketingSink 默认只支持 string 和 avro 格式,我们在此基础上同时支持了 LZO 及 Parquet 格式写入,极大提升数据写入性能。

后续规划

实时计算平台当前正在进行 Storm 任务迁移 Flink 集群,目前已经基本完成,大幅提升了平台资源利用率和计算效率。后续将继续调研完善 Flink 相关能力,推动 Flink 在更多的实时场景下的应用,包括实时规则引擎,实时机器学习等。

作者介绍

冯海涛 / 万石康,负责 58 同城实时计算平台建设。

原文链接

https://mp.weixin.qq.com/s?__biz=MzI4NTA1MDEwNg==&mid=2650784251&idx=1&sn=3f90f0eadad3b781200ec8b31d281dfd&chksm=f3f9746ec48efd78b0e10fa9a3e9fe646ef385c9450762ed634194760b03a528b561dbcab321&scene=27#wechat_redirect

评论

发布