PCon全球产品创新大会9折购票中,立减¥480!查看上线日程 了解详情
写点什么

快手 Druid 精确去重的设计和实现

2019 年 5 月 27 日

快手 Druid 精确去重的设计和实现

本次分享内容提纲:


  • 快手 Druid 平台概览

  • Druid 精确去重功能设计

  • Druid 其他改进

  • 快手 Druid Roadmap


★ 快手 Druid 平台概览

1. 从业务需求角度考虑,快手为什么选择 Druid ?

快手的业务特点包括超大数据规模、毫秒级查询时延、高数据实时性要求、高并发查询、高稳定性以及较高的 Schema 灵活性要求;因此快手选择 Druid 平台作为底层架构。由于 Druid 原生不支持数据精确去重功能,而快手业务中会涉及到例如计费等场景,有精确去重的需求。因此,本文重点讲述如何在 Druid 平台中实现精确去重。另一方面,Druid 对外的接口是 json 形式 ( Druid 0.9 版本之后逐步支持 SQL ) ,对 SQL 并不友好,本文最后部分会简述 Druid 平台与 MySQL 交互方面做的一些改进。


2. 下面重点介绍快手 Druid 平台的架构:


  • 和大多数数据平台一样,快手的数据源主要有两种:基于 kafka 的实时数据源和建立在 hadoop 内的离线数据源。将这两种数据源分别通过 kafka index 和 hadoop index 导入到平台核心—— Druid 模型中。

  • 在 Druid 中,将数据进行业务隔离、冷热分层 ( 例如:冷数据放在硬盘,热数据放在 SSD )

  • 外部业务接口方面,支持通过 API 自行开发业务模块,支持通过 Kwai BI 做可视化;同时针对小部分人需求,现已支持 tableau ( tableau 通过 hive 连接 Druid ) 。

  • 数据平台辅助系统包括 metric 监控 ( 实时监控 CPU 占用率情况,进程情况,lag 信息等 ) 、探针系统 ( 用来查询,定义查询热度、维度等信息,有助于优化 ) 、管理系统 ( 在原生态 Druid 的 json 接口基础上定制管理系统,实现任务的可视化导入等管理操作 )


★ Druid 精确去重概述

1. 原生 Druid 去重功能支持情况

a. 维度列

  • cardinality agg,非精确,基于 hll 。查询时 hash 函数较耗费 CPU

  • 嵌套 group by,精确,耗费资源

  • 社区 DistinctCount 插件,精确,但是局限很大:

  • ① 仅支持单维度,构建时需要基于该维度做 hash partition

  • ② 不能跨 interval 进行计算


b. 指标列

  • HyperUniques/Sketch,非精确,基于 hll,摄入时做计算,相比 cardinality agg 性能更高


结论:Druid 缺乏一种支持预聚合、资源占用低、通用性强的精确去重支持。


2. 精确去重方案

a. 精确去重方案:hashset

使用 hashset 的方式存储,具备简单通用等优点,但是资源占用非常大。以下图所示为例,将左上角的原始数据使用 hashset 方法,Year 作为维度列,City 作为指标列;按照维度列分成 2 个 Segment,转换为右上角的数据格式;接下来将指标列聚合到 broker 中,计算出 size,即得到最终结果。整个过程如下图所示:



但是使用 hashset 的去重方式,其资源占用非常大;具体来说,假设有 5000W 条平均长度为 10B 的 string ( 共 500MB ) ,中间生成的 Hashset 会产生一系列链表结构,导致内存可达到 5G,即内存扩展可达 10 倍。因此,直接使用 hashset 的方法可以是完全不可行的。


在 hashset 方法基础上可以考虑一些优化的思路:例如,通过增加节点,将数据分散到不同机器上,查询时在每台机器上分别聚合进而求和;另一种方式类似于 MapReduce,计算前利用 shuffle 模型将数据打散。然而这两种优化思路和 Druid 平台属性有冲突,因此都不适用于 Druid 这个平台。


b. 精确去重方案:字典编码+Bitmap

Bitmap 方法,即用一个位来表示一个数据 ( 这是理论上的最小值 ) 。这种方案的思路是:对要去重的数据列做编码,将 string 或其他类型的数据转化成 int 型的编码,摄入到 Druid 后通过 Bitmap 的形式存储;查询时将多个 Bitmap 做交集,获取结果。Bitmap 的范围是 42 亿,最大占用空间 500M 左右;还可以使用压缩算法,使空间占用大大减少。


因此,字典编码+Bitmap 的方式,优点是存储和查询资源占用少,可以将构建和查询分开 ( 即字典不参与查询 ) ;缺点是:字典需要做全局编码,其构建过程较复杂。


考虑到 Kylin 曾使用过这种方案,因此快手的后续优化方案选择在这种方案的基础上进行优化。


3. 字典编码方案

a. 字典编码方案的使用

① Redis 方案


Redis 可以进行实时编码,可以同时支持离线任务和实时任务,因此,实时处理数据和离线处理数据可以使用相同的方案。


但是 Redis 的 ID 生成速度有瓶颈,最大 5W/s;查询上压力也较大,对交互会造成一定的麻烦;同时对 Redis 的稳定性要求也比较高。


② 离线 MR 方案


离线 MR 方案,利用 MR 的分布式存储,编码和查询的吞吐量更高,还用用更高的容错性。但是 MR 仅支持离线导入任务。


综合这两种字典编码方案的优劣,得到以下结论:


  • 离线任务使用离线 MR 编码,通过小时级任务解决实现准实时

  • 实时任务仅支持原始 int 去重 ( 无需编码 )


b. 字典编码方案模型

快手的字典编码方案参照的是 Kylin 的 AppendTrie 树模型,模型详见下图。



Trie 树模型主要是使用字符串做编码;快手支持各种类型数据,可将不同类型数据统一转换成字符串类型,再使用 Trie 树模型做编码。Trie 树模型可以实现 Append,有两种方式:


  • 按照节点 ( 而不是位置 ) 保存 ID,从而保持已有 ID 不变。

  • 记录当前模型最大 ID,便于给新增节点分配 ID 。


但是这种模型会带来一个问题:假如基数特别大,Trie 树在内存中就会无限扩张。为了控制内存占用率,选择单颗子树设定阈值,超过即分裂,进而控制单颗子树内存消耗。分裂后,每棵树负责一个范围 ( 类似于 HBase 中的 region 分区 ) ;查询时候只需要查询一颗子树即可。



为节省 CPU 资源,基于 guava LoadingCache,按需加载子树 ( 即懒加载 ) ,并使用 LRU 方法换出子树。


LRU ( Least Recently Used ) 是内存管理的一种置换算法,即内存不够时,将最近不常用的子树从内存清除,加载到磁盘上。LRU 算法根据数据的历史访问记录来进行淘汰数据,其核心思想是“如果数据最近被访问过,那么将来被访问的几率也更高”。


c. 字典并发构建:

字典会存在并发构建的问题,分别使用 MVCC 以及 Zookeeper 分布式锁的方案。


① 使用 MVCC ( 持久化在 hdfs 上 ) ,在读取的时候会使用最新版本,拷贝到 working 中进行构建,构建完成生成新的版本号;如历史数据过多,会根据版本个数及 TTL 进行清理。


② 构建字典的过程中会操作临时目录,如果存在多个进程同时去写临时目录,会存在冲突问题;因此引入 Zookeeper 分布式锁,基于 DataSource 和列来做唯一的定位,从而保证同一个字典同时只能有一个进程。


d. 精确去重的实现

① 新增 unique 指标存储


  • 使用 ComplexMetricSerde 定义一个指标如何 Serde,即序列化与反序列化;

  • 使用 Aggregator,让用户定义一个指标如何执行聚合,即上卷;

  • 使用 Aggregator 的 Buffer 版本 BufferAggregator 作为 Aggregator 的替代品;

  • 使用 AggregatorFractory 定义指标的名称,获取具体的 Aggregator 。


② 精确去重的整体流程介绍



  • 使用 DetermineConfigurationJob 计算最终 Segment 分片情况。

  • 使用 BuildDictJob,Map 将同一去重列发送到一个 reducer 中 ( Map 端可先 combine ) ;每个 reducer 构建一列的全局字典;每列字典构建申请 ZK 锁。

  • IndexGeneratorJob,在 Map 中加载字典,将去重列编码成 int;Reducer 将 int 聚合成 Bitmap 存储;每一个 reducer 生成一个 Segment。


此套精确去重的使用方法:


  • 导入时,定义 unique 类型指标


  • 查询时,使用 unique 类型查询



e. 性能优化与测试

① 优化字典查询


· 存在一个超高基数列 ( UHC ) 的去重指标:在 IndexGenerator 之前增加 ClusterBy UHC 任务,保证每个 map 处理的 uhc 列数据有序,避免多次换入换出。


· 存在多个超高基数列的去重指标:拆成多个 DataSource,调大 IndexGenerator 任务的 map 内存,保证字典能加载到内存


② Bitmap 查询优化


减少 targetPartitionSize 或调大 numShards,增加 Segment 个数,提升并行度。


使用 Bitmap 做 or 计算时候,建议使用 batchOr 方法,而不是逐行 or,避免 I/O 与 or 计算交替,缓存持续被刷新导致性能降低。关于 batchOr 的选择,rolling_bitmap 提供一些接口,默认选择 naive_or ( 尽量延迟计算模型 ) ,通常情况下其性能较好;priorityqueue_or 使用堆排序,每次合并最小两个 bitmap,消耗更多内存,官方建议 benchmark 后使用;如果结果是较长的连续序列,可以选择手动按顺序依次 inplace or。


另一个思路是 Croaring-high performance low-level implementation


Rolling bitmap 用 C 语言实现了 Simd,相对于 java 版,Smid 版更多地利用了向量指令 ( avx2 ) ,性能提升了 80%。


快手将 Smid 以 JNI 的方式引入,对 100W 的 Bitmap 做 Or 操作,使用 Java 版本单线程操作,计算用时 13s;用 JNI 调用 Croaring,数据进行反序列化(需要 memcpy 用时 6.5s),计算消耗 7.5s,总用时 14s(慢于 java 版本)。Croaring 暂时不支持原地 inplace 反序列化 ( ImmutableRoaringBitmap ) ,导致其实际运行效率与官方称的 80%提升不一致。


③ 传输层编码


Broker 与 Historical 之间通过 http 协议交换数据,默认打开 gzip 压缩 ( 也可以选择不压缩,即 plain json ) 。测试发现 gzip 压缩的过程中会耗用大量 CPU,因此在万兆的网络下建议不压缩。



④ 对上述查询优化步骤依次性能测试:


8 个维度,10 亿基数的数据,选择列 author_id 作为其去重指标,摄入后达 150W 行;10 台 historical 机器:



未做优化去重,查询时间 50s;


增加 Segment 数量至 10,查询时间为 7s,提升约 7 倍性能;


将 Or 的策略设置为 BatchOr,查询时间为 4s;


关闭 gzip 压缩,查询时间为 2s。


以上是对 10 亿基数数据作去重的查询时间;如果数据基数在 1 亿以下,查询时间为毫秒级。


★ Druid 其他方面做的改进

1. 资源隔离部署方案


该方案为 Druid 官方推荐部署方案,充分利用 Druid 的分层特质性,根据业务,根据数据冷热,分到不同 proxy 上,保证各个业务相互不受影响。


2. 物化视图

物化视图是包括一个查询结果的数据库对象,可以将其理解为远程数据的的本地副本,或者用来生成基于数据表求和的汇总表,这些副本是只读的。


如果想修改本地副本,必须用高级复制的功能;如果想从一个表或视图中抽取数据时,可以用从物化视图中抽取。物化视图可以通过数据库的内部机制可以定期更新,将一些大的耗时的表连接用物化视图实现,会提高查询的效率。


a. 维度物化

Druid 社区 0.13 开始具备物化视图功能,社区实现了维度的物化。应用场景如下:原始数据有很高维度,而实际查询使用到的维度是原始维度的子集;因此不妨对原始维度做小的聚合 ( 类似 Kylin 中的 cube 和 cuboid ) ;对经常查询的部分维度做物化。



b. 时序物化

除了维度上的物化,还包括时序上的物化,例如将原始数据按照小时聚合、按照分钟聚合。



c. 物化效果


这里,主要关心物化膨胀率和物化命中率,两项指标会影响最终的查询效率。物化膨胀率,表示物化后的数据存储耗用资源情况;物化命中率,可以描述物化后与实际应用场景的匹配度。表格中,ds4 和 ds5 都做到了物化膨胀率很低,物化命中率很高,因此,查询效率可获得 7~9 倍的提升。


3. Historical 快速重启

快手数据平台硬件资源,大约是 12 块容量 2T 的 SATA 硬盘,数据量平均为 10W segments,数据占用 10T 空间,重启一次大约 40min。


Druid 中,默认重启的时候会加载全部数据;考虑到超过 10T 的元信息在启动时并不需要,可以将加载数据推迟到查询时候。因此,使用 Guava Suppliers.memoize 命令,延迟数据加载信息,只有在查询的时候才作列的加载。此过程通过一个参数 ( LazyLoad ) 控制。


( 此代码已提交 Druid 社区:druid.segmentCache.lazyLoadOnStart ( pr: 6988 ) )


优化后,重启过程只需要 2min,提升 20 倍。


4. Kafka Index 方面的改进

a. TaskCount 自动伸缩

Kafka index 任务数即 TaskCount 为固定,需要按照峰值任务数设定,这样导致在非高峰时刻会存在资源浪费。


这里实施的一个优化策略是:


  • KafkaSupervisor 增加 DynamicTaskCountNotice

  • 基于 task 的 cpu use & kafka lag 阈值,进行 25%增减 task count

  • 无需重启 supervisor



b. 精细化调度

Middle Manager 的 indexing task 资源分配从 slot 改成按照内存大小分配 ( 类似于 MapReduce 从 1.0 到 2.0 的改进 ) ,具体优化方法如下:


  • 区分 Kafka indexing task 和 Hadoop indexing task ( 一般 Hadoop indexing task 不需要占用内存 )

  • 允许在提交 task 时指定 task 内存大小。



使用这种方式优化,实时数据任务处理可节省超过 65%的内存占用,离线数据任务处理可节省超过 87%的内存占用。


5. 元数据交互

元数据 ( Metadata ),又称中介数据、中继数据,主要是描述数据属性 ( property ) 的信息,用来支持如指示存储位置、历史数据、资源查找、文件记录等功能,是关于数据的组织、数据域及其关系的信息,可以看作是一种电子式目录。简言之,元数据就是关于数据的数据。


a. Overlord 与 MySQL 交互优化

Overlord 节点负责接收任务,协调和分配任务,为任务创建锁,并返回任务状态给任务发送方,Overlord 有两种运行模式:本地模式或者远程模式 ( 默认本地模式 ) 。


Overlord 控制台可以查看等待的任务、运行的任务、可用的 worker,最近创建和结束的 worker 等。


Segment 是 Druid 中最基本的数据存储单元,采用列式的方式存储某一个时间间隔 ( interval ) 内某一个数据源 ( dataSource ) 的部分数据所对应的所有维度值、度量值、时间维度以及索引。


Segment 的逻辑名称结构为:


<dataSource><intervalStart><intervalEnd><version><partitionNum>


<dataSource> 表示数据源 ( 或表 ) ;<intervalStart> 和 <intervalEnd> 分别表示时间段的起止时间;<version> 表示版本号,用于区分多次加载同一数据对应的 Segment;<partitionNum> 表示分区编号 ( 在每个 interval 内,可能会有多个 partition )


Segments 在 HDFS 上的物理存储路径下包括两个文件:descriptor.json 和 index.zip 。前者记录的是 Segment 的描述文件(样例见下表),其内容也保存在 Druid 集群的元数据的 druid_segments 表中;index.zip 则是数据文件。



描述文件descriptor.json样例:{"dataSource": "AD_active_user","interval": "2018-04-01T00:00:00.000+08:00/2018-04-02T00:00:00.000+08:00","version": "2018-04-01T00:04:07.022+08:00","loadSpec": {"type": "hdfs","path": "/druid/segments/AD_active_user/20180401T000000.000+0800_20180402T000000.000+0800/2018-04-01T00_04_07.022+08_00/1/index.zip"},"dimensions": "appkey,spreadid,pkgid","metrics": "myMetrics,count,offsetHyperLogLog","shardSpec": {"type": "numbered","partitionNum": 1,"partitions": 0},"binaryVersion": 9,"size": 168627,"identifier": "AD_active_user_2018-04-01T00:00:00.000+08:00_2018-04-02T00:00:00.000+08:00_2018-04-01T00:04:07.022+08:00_1"}

复制代码


将 druid_segments 增加索引 ( dataSource,used,end ) ,查询时间从 10s 优化到 1s 。


b. Coordinator 与 MySQL 交互优化

Druid 的 Coordinator 节点主要负责 Segment 的管理和分发。具体的就是,Coordinator 会基于配置与历史节点通信加载或丢弃 Segment 。Druid Coordinator 负责加载新的 Segment,丢弃过期的 Segment,管理 Segment 的副本以及 Segment 的负载均衡。


Coordinator 会根据配置参数里的设置的事件定期的执行,每次执行具体的操作之前都会估算集群的状态。与 broker 节点和 historycal 节点类似的,Coordinator 节点也会和 Zookeeper 集群保持连接用于交互集群信息。同时,Coordinator 也会和保存着 Segment 和 rule 信息的数据源保持通信。可用的 Segment 会被存储在 Segment 的表中,并且列出了所有应该被集群加载的 Segment 。Rule 保存在 rule 的表中,指出了应该如何处理 Segment 。


① Segment 发现


将 Coordinator 中默认的全量扫描 druid_segments 表改成增加读取,druid_segments 添加索引 ( used,created_date ) ,查询时间从 1.7min 优化到 30ms 。


② 整个协调周期


  • Segment 之前使用 TreeSet 按时间排序确保优先 load 最近的 segment 现在优化成 ConcurrentHashSet,并分成最近一天和一天前的两个 set,先操作最近一天的;

  • Apply rules 的时候对 LoadRule 先判断集群副本和配置的是否一致,是的话就跳过,提升并发能力;

  • Cleanup 和 overshadow 两个操作合并;

  • 查询时间从 3min 优化到 30s 。


★ 快手 Druid Roadmap

1. 在线化

今年会实现多集群、高可用,着重保证在线业务的 SLA,同时线上业务也要着重权限的管理。


2. 易用性

实现对 SQL 的支持,同时快手的 BI 产品也会做相应的升级和优化。


3. 性能及优化 ( Druid 内核方面,与社区合作 )

  • 自适应的物化视图,智能化

  • 数值型索引

  • 向量化引擎


最后附上我们提交社区的代码:


https://github.com/apache/incubator-druid/pull/7594( 精确去重功能 )


https://github.com/apache/incubator-druid/pull/6988 ( historical 快速重启 )


作者介绍:

邓钫元,快手大数据架构团队研发工程师,毕业于浙江大学,曾就职于百度、贝壳,目前负责快手 Druid 平台研发工作,多年底层集群以及 OLAP 引擎研发、分布式系统的优化经验,热衷开源,为 hadoop / kylin / druid 等社区贡献代码。


本文来自 DataFun 社区


原文链接


https://mp.weixin.qq.com/s/jDW1sordtki-O5-tsVE94g


2019 年 5 月 27 日 08:0022476

评论

发布
暂无评论
发现更多内容

故事(上)

m小幼

抖音快手短视频平台获客系统开发内容

开發I852946OIIO

华为海外女科学家为您揭秘:GaussDB(for MySQL)云栈垂直集成的力量有多大?

华为云开发者社区

数据库 云数据库 GaussDB(for MySQL) 云栈 事务数据库服务

《MySQL系列》 InnoDB行记录存储结构

Silently9527

MySQL 面试 innodb innodb行记录

李欲晓:加强关键信息基础设施安全保护的法治基石

郑州埃文科技

前端基础四之JavaScriptDOM与事件

ベ布小禅

8月日更

测试开发之系统篇-Docker常用操作

禅道项目管理

Docker 容器 测试开发

接口文档生成工具 一键生成文档 ApiPost

CodeNongXiaoW

项目管理 前端 测试 后端 接口管理工具

Go 语言, 一文彻底搞懂 map 实现原理

微客鸟窝

Go 8月日更

百度世界2021:百度大脑升级、昆仑芯2量产、智能云加速AI落地爆发

百度大脑

人工智能 百度大脑

抖音快手短视频SEO营销系统软件开发价格

开發I852946OIIO

Go语言那些事儿之浅谈协程并发竞争资源问题

Regan Yue

Go 8月日更

基于Serverless架构的社区文章管理小工具

刘宇

接口管理工具APIPOST的预/后执行脚本里,常见的响应参数变量和常用方法集合——apipost

Proud lion

前端 后端 Postman 开发工具 接口文档

【架构实战营】模块五作业

Abner S.

#架构实战营

fil为什么会暴涨?fil暴涨还会持续吗?

区块链 分布式存储 IPFS fil币价行情 fil币会大涨吗?

用手机写代码:基于Serverless的在线编程能力探索

刘宇

抖音快手短视频营销软件系统开发案例

开發I852946OIIO

科技平台与社会的和谐相处

CECBC区块链专委会

抖音快手短视频SEO系统开发

开發I852946OIIO

Android SDK 启动退出方案演进

神策技术社区

前端 后端 代码 数据采集

【从零开始学爬虫】采集当当网图书商品信息

前嗅大数据

大数据 爬虫 数据采集

缓存 | Redis 缓存避坑指南

RadonDB开源社区

数据库 redis

带你读AI论文丨用于目标检测的高斯检测框与ProbIoU

华为云开发者社区

算法 数据集 目标检测 高斯检测框 ProbIoU

短视频go研发框架实践

百度Geek说

百度 架构 后端 短视频 hulk

数字人民币银银合作以及平台接入的模式分析

CECBC区块链专委会

菜谱系统小成阶段,Python Web 领域终于攻占一个小山头

梦想橡皮擦

8月日更

接口测试的时候如何一键获取cookie,并在其他接口引用

与风逐梦

软件测试 接口测试 Cookie

抖音快手短视频询盘系统开发

开發I852946OIIO

一文带你了解NB-IoT标准演进与产业发展

华为云开发者社区

物联网 IoT NB-IoT

web技术分享| 实现WebRTC多个对等连接

anyRTC开发者

音视频 WebRTC JavaScrip web技术分享

数据cool谈(第1期)数据库寻路,开源有态度

数据cool谈(第1期)数据库寻路,开源有态度

快手 Druid 精确去重的设计和实现-InfoQ