写点什么

快手 Druid 精确去重的设计和实现

  • 2019-05-27
  • 本文字数:6869 字

    阅读完需:约 23 分钟

快手 Druid 精确去重的设计和实现

AI 大模型超全落地场景&金融应用实践,8 月 16 - 19 日 FCon x AICon 大会联诀来袭、干货翻倍!

本次分享内容提纲:


  • 快手 Druid 平台概览

  • Druid 精确去重功能设计

  • Druid 其他改进

  • 快手 Druid Roadmap

★ 快手 Druid 平台概览

1. 从业务需求角度考虑,快手为什么选择 Druid ?

快手的业务特点包括超大数据规模、毫秒级查询时延、高数据实时性要求、高并发查询、高稳定性以及较高的 Schema 灵活性要求;因此快手选择 Druid 平台作为底层架构。由于 Druid 原生不支持数据精确去重功能,而快手业务中会涉及到例如计费等场景,有精确去重的需求。因此,本文重点讲述如何在 Druid 平台中实现精确去重。另一方面,Druid 对外的接口是 json 形式 ( Druid 0.9 版本之后逐步支持 SQL ) ,对 SQL 并不友好,本文最后部分会简述 Druid 平台与 MySQL 交互方面做的一些改进。

2. 下面重点介绍快手 Druid 平台的架构:


  • 和大多数数据平台一样,快手的数据源主要有两种:基于 kafka 的实时数据源和建立在 hadoop 内的离线数据源。将这两种数据源分别通过 kafka index 和 hadoop index 导入到平台核心—— Druid 模型中。

  • 在 Druid 中,将数据进行业务隔离、冷热分层 ( 例如:冷数据放在硬盘,热数据放在 SSD )

  • 外部业务接口方面,支持通过 API 自行开发业务模块,支持通过 Kwai BI 做可视化;同时针对小部分人需求,现已支持 tableau ( tableau 通过 hive 连接 Druid ) 。

  • 数据平台辅助系统包括 metric 监控 ( 实时监控 CPU 占用率情况,进程情况,lag 信息等 ) 、探针系统 ( 用来查询,定义查询热度、维度等信息,有助于优化 ) 、管理系统 ( 在原生态 Druid 的 json 接口基础上定制管理系统,实现任务的可视化导入等管理操作 )

★ Druid 精确去重概述

1. 原生 Druid 去重功能支持情况

a. 维度列

  • cardinality agg,非精确,基于 hll 。查询时 hash 函数较耗费 CPU

  • 嵌套 group by,精确,耗费资源

  • 社区 DistinctCount 插件,精确,但是局限很大:

  • ① 仅支持单维度,构建时需要基于该维度做 hash partition

  • ② 不能跨 interval 进行计算

b. 指标列

  • HyperUniques/Sketch,非精确,基于 hll,摄入时做计算,相比 cardinality agg 性能更高


结论:Druid 缺乏一种支持预聚合、资源占用低、通用性强的精确去重支持。

2. 精确去重方案

a. 精确去重方案:hashset

使用 hashset 的方式存储,具备简单通用等优点,但是资源占用非常大。以下图所示为例,将左上角的原始数据使用 hashset 方法,Year 作为维度列,City 作为指标列;按照维度列分成 2 个 Segment,转换为右上角的数据格式;接下来将指标列聚合到 broker 中,计算出 size,即得到最终结果。整个过程如下图所示:



但是使用 hashset 的去重方式,其资源占用非常大;具体来说,假设有 5000W 条平均长度为 10B 的 string ( 共 500MB ) ,中间生成的 Hashset 会产生一系列链表结构,导致内存可达到 5G,即内存扩展可达 10 倍。因此,直接使用 hashset 的方法可以是完全不可行的。


在 hashset 方法基础上可以考虑一些优化的思路:例如,通过增加节点,将数据分散到不同机器上,查询时在每台机器上分别聚合进而求和;另一种方式类似于 MapReduce,计算前利用 shuffle 模型将数据打散。然而这两种优化思路和 Druid 平台属性有冲突,因此都不适用于 Druid 这个平台。

b. 精确去重方案:字典编码+Bitmap

Bitmap 方法,即用一个位来表示一个数据 ( 这是理论上的最小值 ) 。这种方案的思路是:对要去重的数据列做编码,将 string 或其他类型的数据转化成 int 型的编码,摄入到 Druid 后通过 Bitmap 的形式存储;查询时将多个 Bitmap 做交集,获取结果。Bitmap 的范围是 42 亿,最大占用空间 500M 左右;还可以使用压缩算法,使空间占用大大减少。


因此,字典编码+Bitmap 的方式,优点是存储和查询资源占用少,可以将构建和查询分开 ( 即字典不参与查询 ) ;缺点是:字典需要做全局编码,其构建过程较复杂。


考虑到 Kylin 曾使用过这种方案,因此快手的后续优化方案选择在这种方案的基础上进行优化。

3. 字典编码方案

a. 字典编码方案的使用

① Redis 方案


Redis 可以进行实时编码,可以同时支持离线任务和实时任务,因此,实时处理数据和离线处理数据可以使用相同的方案。


但是 Redis 的 ID 生成速度有瓶颈,最大 5W/s;查询上压力也较大,对交互会造成一定的麻烦;同时对 Redis 的稳定性要求也比较高。


② 离线 MR 方案


离线 MR 方案,利用 MR 的分布式存储,编码和查询的吞吐量更高,还用用更高的容错性。但是 MR 仅支持离线导入任务。


综合这两种字典编码方案的优劣,得到以下结论:


  • 离线任务使用离线 MR 编码,通过小时级任务解决实现准实时

  • 实时任务仅支持原始 int 去重 ( 无需编码 )

b. 字典编码方案模型

快手的字典编码方案参照的是 Kylin 的 AppendTrie 树模型,模型详见下图。



Trie 树模型主要是使用字符串做编码;快手支持各种类型数据,可将不同类型数据统一转换成字符串类型,再使用 Trie 树模型做编码。Trie 树模型可以实现 Append,有两种方式:


  • 按照节点 ( 而不是位置 ) 保存 ID,从而保持已有 ID 不变。

  • 记录当前模型最大 ID,便于给新增节点分配 ID 。


但是这种模型会带来一个问题:假如基数特别大,Trie 树在内存中就会无限扩张。为了控制内存占用率,选择单颗子树设定阈值,超过即分裂,进而控制单颗子树内存消耗。分裂后,每棵树负责一个范围 ( 类似于 HBase 中的 region 分区 ) ;查询时候只需要查询一颗子树即可。



为节省 CPU 资源,基于 guava LoadingCache,按需加载子树 ( 即懒加载 ) ,并使用 LRU 方法换出子树。


LRU ( Least Recently Used ) 是内存管理的一种置换算法,即内存不够时,将最近不常用的子树从内存清除,加载到磁盘上。LRU 算法根据数据的历史访问记录来进行淘汰数据,其核心思想是“如果数据最近被访问过,那么将来被访问的几率也更高”。

c. 字典并发构建:

字典会存在并发构建的问题,分别使用 MVCC 以及 Zookeeper 分布式锁的方案。


① 使用 MVCC ( 持久化在 hdfs 上 ) ,在读取的时候会使用最新版本,拷贝到 working 中进行构建,构建完成生成新的版本号;如历史数据过多,会根据版本个数及 TTL 进行清理。


② 构建字典的过程中会操作临时目录,如果存在多个进程同时去写临时目录,会存在冲突问题;因此引入 Zookeeper 分布式锁,基于 DataSource 和列来做唯一的定位,从而保证同一个字典同时只能有一个进程。

d. 精确去重的实现

① 新增 unique 指标存储


  • 使用 ComplexMetricSerde 定义一个指标如何 Serde,即序列化与反序列化;

  • 使用 Aggregator,让用户定义一个指标如何执行聚合,即上卷;

  • 使用 Aggregator 的 Buffer 版本 BufferAggregator 作为 Aggregator 的替代品;

  • 使用 AggregatorFractory 定义指标的名称,获取具体的 Aggregator 。


② 精确去重的整体流程介绍



  • 使用 DetermineConfigurationJob 计算最终 Segment 分片情况。

  • 使用 BuildDictJob,Map 将同一去重列发送到一个 reducer 中 ( Map 端可先 combine ) ;每个 reducer 构建一列的全局字典;每列字典构建申请 ZK 锁。

  • IndexGeneratorJob,在 Map 中加载字典,将去重列编码成 int;Reducer 将 int 聚合成 Bitmap 存储;每一个 reducer 生成一个 Segment。


此套精确去重的使用方法:


  • 导入时,定义 unique 类型指标


  • 查询时,使用 unique 类型查询


e. 性能优化与测试

① 优化字典查询


· 存在一个超高基数列 ( UHC ) 的去重指标:在 IndexGenerator 之前增加 ClusterBy UHC 任务,保证每个 map 处理的 uhc 列数据有序,避免多次换入换出。


· 存在多个超高基数列的去重指标:拆成多个 DataSource,调大 IndexGenerator 任务的 map 内存,保证字典能加载到内存


② Bitmap 查询优化


减少 targetPartitionSize 或调大 numShards,增加 Segment 个数,提升并行度。


使用 Bitmap 做 or 计算时候,建议使用 batchOr 方法,而不是逐行 or,避免 I/O 与 or 计算交替,缓存持续被刷新导致性能降低。关于 batchOr 的选择,rolling_bitmap 提供一些接口,默认选择 naive_or ( 尽量延迟计算模型 ) ,通常情况下其性能较好;priorityqueue_or 使用堆排序,每次合并最小两个 bitmap,消耗更多内存,官方建议 benchmark 后使用;如果结果是较长的连续序列,可以选择手动按顺序依次 inplace or。


另一个思路是 Croaring-high performance low-level implementation


Rolling bitmap 用 C 语言实现了 Simd,相对于 java 版,Smid 版更多地利用了向量指令 ( avx2 ) ,性能提升了 80%。


快手将 Smid 以 JNI 的方式引入,对 100W 的 Bitmap 做 Or 操作,使用 Java 版本单线程操作,计算用时 13s;用 JNI 调用 Croaring,数据进行反序列化(需要 memcpy 用时 6.5s),计算消耗 7.5s,总用时 14s(慢于 java 版本)。Croaring 暂时不支持原地 inplace 反序列化 ( ImmutableRoaringBitmap ) ,导致其实际运行效率与官方称的 80%提升不一致。


③ 传输层编码


Broker 与 Historical 之间通过 http 协议交换数据,默认打开 gzip 压缩 ( 也可以选择不压缩,即 plain json ) 。测试发现 gzip 压缩的过程中会耗用大量 CPU,因此在万兆的网络下建议不压缩。



④ 对上述查询优化步骤依次性能测试:


8 个维度,10 亿基数的数据,选择列 author_id 作为其去重指标,摄入后达 150W 行;10 台 historical 机器:



未做优化去重,查询时间 50s;


增加 Segment 数量至 10,查询时间为 7s,提升约 7 倍性能;


将 Or 的策略设置为 BatchOr,查询时间为 4s;


关闭 gzip 压缩,查询时间为 2s。


以上是对 10 亿基数数据作去重的查询时间;如果数据基数在 1 亿以下,查询时间为毫秒级。

★ Druid 其他方面做的改进

1. 资源隔离部署方案


该方案为 Druid 官方推荐部署方案,充分利用 Druid 的分层特质性,根据业务,根据数据冷热,分到不同 proxy 上,保证各个业务相互不受影响。

2. 物化视图

物化视图是包括一个查询结果的数据库对象,可以将其理解为远程数据的的本地副本,或者用来生成基于数据表求和的汇总表,这些副本是只读的。


如果想修改本地副本,必须用高级复制的功能;如果想从一个表或视图中抽取数据时,可以用从物化视图中抽取。物化视图可以通过数据库的内部机制可以定期更新,将一些大的耗时的表连接用物化视图实现,会提高查询的效率。

a. 维度物化

Druid 社区 0.13 开始具备物化视图功能,社区实现了维度的物化。应用场景如下:原始数据有很高维度,而实际查询使用到的维度是原始维度的子集;因此不妨对原始维度做小的聚合 ( 类似 Kylin 中的 cube 和 cuboid ) ;对经常查询的部分维度做物化。


b. 时序物化

除了维度上的物化,还包括时序上的物化,例如将原始数据按照小时聚合、按照分钟聚合。


c. 物化效果


这里,主要关心物化膨胀率和物化命中率,两项指标会影响最终的查询效率。物化膨胀率,表示物化后的数据存储耗用资源情况;物化命中率,可以描述物化后与实际应用场景的匹配度。表格中,ds4 和 ds5 都做到了物化膨胀率很低,物化命中率很高,因此,查询效率可获得 7~9 倍的提升。

3. Historical 快速重启

快手数据平台硬件资源,大约是 12 块容量 2T 的 SATA 硬盘,数据量平均为 10W segments,数据占用 10T 空间,重启一次大约 40min。


Druid 中,默认重启的时候会加载全部数据;考虑到超过 10T 的元信息在启动时并不需要,可以将加载数据推迟到查询时候。因此,使用 Guava Suppliers.memoize 命令,延迟数据加载信息,只有在查询的时候才作列的加载。此过程通过一个参数 ( LazyLoad ) 控制。


( 此代码已提交 Druid 社区:druid.segmentCache.lazyLoadOnStart ( pr: 6988 ) )


优化后,重启过程只需要 2min,提升 20 倍。

4. Kafka Index 方面的改进

a. TaskCount 自动伸缩

Kafka index 任务数即 TaskCount 为固定,需要按照峰值任务数设定,这样导致在非高峰时刻会存在资源浪费。


这里实施的一个优化策略是:


  • KafkaSupervisor 增加 DynamicTaskCountNotice

  • 基于 task 的 cpu use & kafka lag 阈值,进行 25%增减 task count

  • 无需重启 supervisor


b. 精细化调度

Middle Manager 的 indexing task 资源分配从 slot 改成按照内存大小分配 ( 类似于 MapReduce 从 1.0 到 2.0 的改进 ) ,具体优化方法如下:


  • 区分 Kafka indexing task 和 Hadoop indexing task ( 一般 Hadoop indexing task 不需要占用内存 )

  • 允许在提交 task 时指定 task 内存大小。



使用这种方式优化,实时数据任务处理可节省超过 65%的内存占用,离线数据任务处理可节省超过 87%的内存占用。

5. 元数据交互

元数据 ( Metadata ),又称中介数据、中继数据,主要是描述数据属性 ( property ) 的信息,用来支持如指示存储位置、历史数据、资源查找、文件记录等功能,是关于数据的组织、数据域及其关系的信息,可以看作是一种电子式目录。简言之,元数据就是关于数据的数据。

a. Overlord 与 MySQL 交互优化

Overlord 节点负责接收任务,协调和分配任务,为任务创建锁,并返回任务状态给任务发送方,Overlord 有两种运行模式:本地模式或者远程模式 ( 默认本地模式 ) 。


Overlord 控制台可以查看等待的任务、运行的任务、可用的 worker,最近创建和结束的 worker 等。


Segment 是 Druid 中最基本的数据存储单元,采用列式的方式存储某一个时间间隔 ( interval ) 内某一个数据源 ( dataSource ) 的部分数据所对应的所有维度值、度量值、时间维度以及索引。


Segment 的逻辑名称结构为:


<dataSource><intervalStart><intervalEnd><version><partitionNum>


<dataSource> 表示数据源 ( 或表 ) ;<intervalStart> 和 <intervalEnd> 分别表示时间段的起止时间;<version> 表示版本号,用于区分多次加载同一数据对应的 Segment;<partitionNum> 表示分区编号 ( 在每个 interval 内,可能会有多个 partition )


Segments 在 HDFS 上的物理存储路径下包括两个文件:descriptor.json 和 index.zip 。前者记录的是 Segment 的描述文件(样例见下表),其内容也保存在 Druid 集群的元数据的 druid_segments 表中;index.zip 则是数据文件。



描述文件descriptor.json样例:{"dataSource": "AD_active_user","interval": "2018-04-01T00:00:00.000+08:00/2018-04-02T00:00:00.000+08:00","version": "2018-04-01T00:04:07.022+08:00","loadSpec": {"type": "hdfs","path": "/druid/segments/AD_active_user/20180401T000000.000+0800_20180402T000000.000+0800/2018-04-01T00_04_07.022+08_00/1/index.zip"},"dimensions": "appkey,spreadid,pkgid","metrics": "myMetrics,count,offsetHyperLogLog","shardSpec": {"type": "numbered","partitionNum": 1,"partitions": 0},"binaryVersion": 9,"size": 168627,"identifier": "AD_active_user_2018-04-01T00:00:00.000+08:00_2018-04-02T00:00:00.000+08:00_2018-04-01T00:04:07.022+08:00_1"}

复制代码


将 druid_segments 增加索引 ( dataSource,used,end ) ,查询时间从 10s 优化到 1s 。

b. Coordinator 与 MySQL 交互优化

Druid 的 Coordinator 节点主要负责 Segment 的管理和分发。具体的就是,Coordinator 会基于配置与历史节点通信加载或丢弃 Segment 。Druid Coordinator 负责加载新的 Segment,丢弃过期的 Segment,管理 Segment 的副本以及 Segment 的负载均衡。


Coordinator 会根据配置参数里的设置的事件定期的执行,每次执行具体的操作之前都会估算集群的状态。与 broker 节点和 historycal 节点类似的,Coordinator 节点也会和 Zookeeper 集群保持连接用于交互集群信息。同时,Coordinator 也会和保存着 Segment 和 rule 信息的数据源保持通信。可用的 Segment 会被存储在 Segment 的表中,并且列出了所有应该被集群加载的 Segment 。Rule 保存在 rule 的表中,指出了应该如何处理 Segment 。


① Segment 发现


将 Coordinator 中默认的全量扫描 druid_segments 表改成增加读取,druid_segments 添加索引 ( used,created_date ) ,查询时间从 1.7min 优化到 30ms 。


② 整个协调周期


  • Segment 之前使用 TreeSet 按时间排序确保优先 load 最近的 segment 现在优化成 ConcurrentHashSet,并分成最近一天和一天前的两个 set,先操作最近一天的;

  • Apply rules 的时候对 LoadRule 先判断集群副本和配置的是否一致,是的话就跳过,提升并发能力;

  • Cleanup 和 overshadow 两个操作合并;

  • 查询时间从 3min 优化到 30s 。

★ 快手 Druid Roadmap

1. 在线化

今年会实现多集群、高可用,着重保证在线业务的 SLA,同时线上业务也要着重权限的管理。

2. 易用性

实现对 SQL 的支持,同时快手的 BI 产品也会做相应的升级和优化。

3. 性能及优化 ( Druid 内核方面,与社区合作 )

  • 自适应的物化视图,智能化

  • 数值型索引

  • 向量化引擎


最后附上我们提交社区的代码:


https://github.com/apache/incubator-druid/pull/7594( 精确去重功能 )


https://github.com/apache/incubator-druid/pull/6988 ( historical 快速重启 )

作者介绍:

邓钫元,快手大数据架构团队研发工程师,毕业于浙江大学,曾就职于百度、贝壳,目前负责快手 Druid 平台研发工作,多年底层集群以及 OLAP 引擎研发、分布式系统的优化经验,热衷开源,为 hadoop / kylin / druid 等社区贡献代码。


本文来自 DataFun 社区


原文链接


https://mp.weixin.qq.com/s/jDW1sordtki-O5-tsVE94g


2019-05-27 08:0024105

评论

发布
暂无评论
发现更多内容

【架构师训练营】第1周-作业-食堂就餐卡系统

芥末

极客大学架构师训练营

免费P7架构师直播课!技术人员如何提升职场技能?

奈学教育

架构师

食堂就餐卡系统设计(作业版)

Jerry Tse

极客大学架构师训练营 作业

写作的几点建议:面对卡文,写别人的题目,栩栩如生的写作

董一凡

写作

架构师训练营-第一课作业-20200610-食堂就餐卡系统

👑👑merlan

架构 作业

第一周作业

东哥

极客大学架构师训练营

食堂就餐卡系统设计

魔曦

极客大学架构师训练营

While语句

Hello

4天如何完爆Kafka源码核心流程!

奈学教育

kafka

UML 体验(就餐卡系统设计)

陈皮

如何成为一个架构师?

逍遥乐天

极客大学架构师训练营

你还在为 TCP 重传、滑动窗口、流量控制、拥塞控制发愁吗?看完图解就不愁了

小林coding

TCP 计算机网络 网络协议

作业1

annie

极客大学架构师训练营

第一周学习总结

小海豚

学习

食堂就餐卡系统设计

互金从业者X

01-kubernetes安装部署(手动)

绿星雪碧

Kubernetes etcd flannel

架构师训练营第一周学习总结

whiter

极客大学架构师训练营

【总结】第一周架构师如何做架构

chengjing

架构师是什么?

芥末

极客大学架构师训练营

被迫重构代码,这次我干掉了 if-else

程序员小富

架构师培训-01食堂就餐卡系统设计文档

刘敏

食堂就餐卡系统设计

小海豚

学习 食堂就餐卡系统设计

架构师训练营第01周——总结

李伟

极客大学架构师训练营

4天如何完爆Kafka源码核心流程!

古月木易

kafka

第一周命题作业

AspYc

免费P7架构师直播课!技术人员如何提升职场技能?

古月木易

架构师

第一周学习总结

AspYc

食堂就餐卡系统架构设计文档

小叶

架构设计

架构师训练营-作业-1】食堂就餐卡系统设计

superman

学习 极客大学架构师训练营

一味的坚持,或许只是徒劳

山楂大卷

逻辑思维 职业成长 工作体会

食堂就餐卡系统设计

架构设计 极客大学架构师训练营

快手 Druid 精确去重的设计和实现_架构_DataFunTalk_InfoQ精选文章