大数据分析与列数据库

阅读数:9136 2013 年 1 月 10 日

话题:大数据语言 & 开发架构AI

近年来随着数据量的激增,对于数据分析的需求也日益迫切,传统的 RDBMS 已经远远不能满足企业对大数据分析的需求,虽然很多厂商都声称自己具有列数据库的特性,但是绝大多数都不具备处理真正大数据的能力,在今年 8 月份,Google 在 VLDB 2012 大会上发表了 << Processing a Trillion Cells per Mouse Click>> 论文 [1],展示了 Google 新的大数据分析技术 PowerDrill, 本文将借用这篇论文的实验数据,结合笔者的上一篇 Hadoop 文件格式 [2] 的内容介绍更多大数据分析中列数据库的核心原理, 希望读者能对列数据库的原理有更多了解,也希望对将来 Hadoop 在针对数据分析方面能够有更多优化, 并对一些忽悠的厂商和空喊口号的技术有辨别能力。

列文件格式和压缩

在常见的列数据库技术中,一个总是被混淆的概念是面向列储存和面向列的压缩 (Column storage and Columnar compression, 见参考资料 [3]) , 面向列储存指的是将同类数据放在一起,这类数据在物理磁盘和物理内存上表现为连续空间,也就是我们熟称的”将不同列分开放”(这个描述并不准确但是更容易理解), 而面向列的压缩是指将不同的数据以更小的代价存放在磁盘或内存中,它往往包括非常高效的编码和解码技术 (Encoding and Decoding) , 比如 Run Length Encoding , BitVector Encoding , 真正的列数据库中会包括与这些压缩格式相对应的延迟物化技术 (later Materialization), 高效的压缩格式和延迟物化特性是真正列数据库和伪列数据库之间查询性能和集群吞吐能力的最主要差别.

高效压缩之 Run length Encoding

Run length Encoding 将同一列的连续数据压缩成它的实际数值和这个数值出现的连续次数,比如 AAABBBBBCCCCCCC 这样一个包含 15 条数据的某列数值,run length encoding 会将它压缩成一个三元数组 (实际值, 起始位置, 个数),比如上面的数值会压缩成 [A,1,3][B,4,5][C,8,7] 的格式, 从而使原始的数据无论在磁盘还是内存中都可以占用更少的空间,由于 run length encoding 的特性,数据往往需要重新排序从而得到更好的结果,在实际生产环境中, 性别,年龄,城市等选择性非常高的列往往都是 run length encoding 处理的对象. 在列数据库中数据往往会经过多层排序,比如第一层排序为性别,第二层排序为年龄,第三层排序为城市, 即使那些本来选择性不算高的列,在排序之后的小范围区间内也可能使类似的记录满足 run length encoding 的压缩条件,从而使记录更加适合压缩.

高效压缩之 Bit-Vector Encoding

Bit-vector encoding 是数据仓库中最常用的优化手段, 行数据库中使用的一般为 bitmap index, 它一般只针对单个列而且是额外的存储结构,列数据库中的 bit-vector encoding 主要针对数据本身而且含有较少的唯一值才进行编码,在这种编码中, 会先储存所有出现过的值,然后使用 bit 数字 1 来表示实际这个数值是否出现在列中,其他 bit 位用 0 来表示. 比如某个 chunk 的数值为:

复制代码
A A C C D D A B E

Bit-Vector encoding 会使用 ABCDE 这样的字典来储存实际的值,然后使用:

110000100 : 对应 bit-string 值 A

000000010 : 对应 bit-string 值 B

001100000 : 对应 bit-string 值 C

000011000 : 对应 bit-string 值 D

000000001 : 对应 bit-string 值 E

在上面的例子中,每一个 bit-string 的值没有为 8 的倍数,所以后面的 bit 位就被浪费掉了. 当唯一值越多,需要编码的数值越多,则整个向量空间越稀疏并且消耗就越大,这是行数据库中 bitmap index 低效且不可伸缩的关键因素 (包括 Hive 0.8 引入的 bitmap index 也是如此). 在列数据库中,因每一部分数据块单独存放 (PowerDrill 假设一个数据块大概存放 2 千万条记录), 并且每个数据块都只维护自己的少量唯一值,所以 Bit-Vector encoding 所消耗的空间无论是磁盘还是内存都极少并且不会有伸缩方面的问题. 在处理 Bit-Vector encoding 的计算时,比如对应上面例子某个查询需要知道字符串为 B 的个数时,只需要将 bit-string B 进行位操作即可得到, 这比普通的 hashmap 计数器的计算方式会快上几个数量级. Bit-Vector Encoding 一般并不要求数据本身排序,但是近来有研究表明不管是对数据表的列顺序还是列顺序一定情况下行数据的重新排序都会使 Bit-Vector Encoding 使用更少的磁盘和内存空间 [6][7]. Google PowerDrill 通过实验证明对行数据重新排序会对查询性能有 1.2 倍到 2.8 倍的提升,并且内存比不排序的情况下消耗更少.

Trie Encoding

Trie 数据结构一般也叫 prefix trees, 一般用在数据类型为 string 并且排序之后有明显倾斜的数据分布的列,比如 URL , 家庭住址, 这些字段的前缀经过排序之后在局部区域往往都有很高的压缩比,在最近的 Hbase 里面也使用了这种方式压缩 rowKey 的部分,Google PowerDrill 也同时使用 Trie Encoding 压缩由”字典表”和”字典表所在位置”所组成的文件格式及其对应的内存数据结构.

其他编码方式和压缩

编码和解码不同于压缩,编码和解码一般针对特殊数据和特殊的类型,一般消耗的 CPU 也远远小于压缩所消耗的 CPU,更多时候需要对数据重新排序才能取得更好的压缩比,这种排序既包括列的选择性高低也包括在局部地区重新调整行的顺序. 除了上面提到的 Power Drill 常使用的 Run length Encoding , Bit-vector encoding ,Trie Encoding 之外, 常见的编码还包括:

  • 针对字符或文本的”Dictionary Encoding”: 比如 email 地址, 家庭地址. 这种编码一般不需要排序,主要为了节省储存空间和少量内存空间,对查询处理时间有所提升.
  • 针对固定间隔类型数据的”Delta Encoding” : 比如日期,时间,时间戳和等间距长的数据类型,一般不需要排序,针对特殊应用,比如定时输出数据的监控系统(主机负载,网络流量等), 这种编码无论磁盘还是内存的压缩率都极高, 并且对应查询处理时间也有明显提升.
  • 有时候为了编码会将两个或多个字段进行组合, 使用 Trie Encoding 或者”变长间隔编码”进行处理,这些编码只在非常特殊的数据类型或者数据倾斜下使用,有时候只减少磁盘空间而对查询时间没有提升,甚至使用不当会增加 CPU 解码的负担而提升效果较小.

Run length Encoding 和 Bit-Vector Encoding 一般对某些列压缩会减少储存 3-4 个数量级,对内存提升也有 2-3 个数量级,Dictionary Encoding 和 Trie Encoding 一般对磁盘空间减少大概 20 倍,对内存空间大概减少 5 倍,根据 Google PowerDrill 的实验,在常见的聚合查询中这些特殊的编码方式会对查询速度一般有 2-3 个数量级不等的提升.

上面描述的只是最简单的情况,实际生产环境中要比这复杂的多,run length encoding , Bit-Vector Encoding 和 Dictionary Encoding 在何时使用的情况比较好判断,有些其他的编码方式则会出现时间和空间转换比率的权衡问题,比如 run length encoding 对于连续出现次数小于几十以下的情况提升就不明显,还有 Dictionary Encoding 在建立字典表的时候对于重复次数小于多少次的字符串就不储存在字典表中,以免压缩比率提升不大反而解压缩的消耗反而大增. 至于具体的阈值怎么取舍,根据不同的集群硬件情况,数据分布情况,需要作出不同的调整.

压缩对编程语言垃圾回收也非常有提升,因为在物理内存上更加连续,使得 gc 的处理时间缩短, 操作系统 page cahe 换入换出更快, 具体可以查看参考资料 [3]

压缩往往是针对原始二进制数据, 对不同数据类型的提升差别不大,生产系统中一般同时使用编码解码和压缩. 常用的压缩算法包括 gzip,bzip,LZO,snappy 等,在 Google PowerDrill 的实验中,对已经使用特殊编码和解码的数据继续压缩对内存和处理效率已经提升不大,甚至有可能为了减少内存空间而增加查询处理的延迟,google 权衡考虑 cpu 的消耗,内存提升效率,解压缩的速度而使用了一个修改版的 LZO 压缩算法, 并称相比较已经编码和解码的数据,修改版的 LZO 压缩还能提高 1.4 倍到 2 倍的磁盘空间和 10% 的内存空间,同时解压缩速度比 google 开源的 snappy 要快。 为了避免对应的查询处理延迟问题,Google 同时使用一个 2 层缓存,将更加常用的热点数据不压缩只编码,将较少使用的温热数据压缩,为了保证缓存的有效利用,根据不同的数据访问频次会在 2 层缓存中交换以节省内存消耗.

Skip List 数据结构

在主要以查询为主的数据仓库中,某些列的唯一值会远远多余其他列,比如性别,年龄,城市,商品类别,这些唯一值更少的列往往会作为查询的过滤条件,所以最早的 C-Store 论文 [8] 提出 Projections 的概念,将那些经常作为过滤条件并且可以过滤掉大部分值的列作为排序字段,这些字段以不同的组合组成不同的先后顺序,比如一种 Projections 以性别为第一排序字段,城市为第二排序字段,这在那些主要关注性别和城市的查询时候会过滤掉大部分的数据,对于那些关注年龄和商品类别的查询,可以创建另外一个以年龄为第一排序字段,商品类别为第二排序字段的 Projections. 完全从磁盘和内存压缩的角度来讲,以选择性高低为排序顺序一般会得到最大压缩比,但是查询并不一定用到这些高压缩比的字段,所以选择 Projections 的排序字段一般会选择那些唯一值较少但是又经常作为过滤条件的字段. 根据 Google 的实际生产系统经验,那些经常过滤的字段一般业务人员很容易通过领域知识得出,所以只需要按照这些过滤字段的选择性排序即可. 作为 C-Store 的商业版本 Vertica 可以使用实际的线上生产系统的负载来帮助用户选择更加合适的高效过滤字段组成 Projections,从而避免因选择不当而造成系统资源浪费.

Google PowerDrill 论文中习惯性叫 Projections 的排序字段为”组合范围分区”(composite range partitions), 这不同于常见的 RDBMS 中的分区,首先,RDBMS 的分区主要有范围分区,值分区,哈希分区或前面三种的组合,RDBMS 分区主要为了性能的提升和数据管理的简化(比如分区索引,分区删除,修复和备份),这种分区往往都是物理上进行切分,列数据库中的 Projections 主要是为了压缩效率,查询性能和提高系统的吞吐能力,使用高级压缩功能可以使磁盘容纳更多数据,更少的内存代表系统可以同时执行更多的查询,并且 Projections 的分区并不是物理分区而仅仅是逻辑分区,在一个集群的每台机器上都可以存放多个数据块, 比如 PowerDrill 中的每块数据含有 2 千万条数据, 这样系统可以实时的加载数据因为数据会平分到多个数据块 (类似于 Hbase 里面的 Distributed Log Split 原理),对于 RDBMS 的物理分区来说,数据加载往往会落到一个或少数几个热点的分区导致这些分区的数据不能实时的加载. 除了性能,集群吞吐和加载速度,具有 Projections 概念的列数据库还具有 Join 操作的优化,高可用性,随意伸缩性,容易恢复等特点,如果读者感兴趣可以参考 2005 年的 C-Store 论文 [8]

除了 Google PowerDrill 使用的这种 Projections 过滤数据,多年来也有多种 RDBMS 使用不同的方式过滤数据,比如 Netezza , Oracle 11g 的 PAX 样式的文件格式 (HIVE 里面的 RCFile 原理类似, 如果 RCFile 有过滤功能也会一样), 在一个小块里面的同一列往往聚合在一起(一个数据块一般 4M-8M 大小不等),然后提供最大值最小值来判断查询条件,这种文件格式不能使用上面提到的高级压缩功能,然后过滤效果比较低效(大部分原因在于不重新排序),同时因为过滤数据低效所以缓存使用率不高,另外一种为 BrighHouse(开源版本为 InfoBright),Google Dremel , (IBM 的 CIF[9] 如果有的话也会是这个原理) 文件格式为列的数据块放在连续空间,每个数据块为 1M 并且头部带有当前数据的最大最小值,这种格式同样因为不具备多重排序而过滤效果一般,当查询只含有一个过滤条件时,过滤效果只会比 Projections 的排序效率略差 (它是范围判断,不是唯一值判断),但是当查询含有 2 个或 2 个以上过滤条件时,这些过滤条件会组成大量的”可能需要扫描字段”从而导致过滤效果低下,进而导致缓存使用效果低下,单个查询效率低下和集群吞吐下降.

所以 Projections 的 Skip List 数据结构对比其他类似数据格式的数据过滤功能主要有以下好处:

  1. 过滤效果最好, 尤其是多个过滤条件下的过滤效果.
  2. 它能根据实际的生产系统的负载调节过滤的字段,从而进一步提高实际环境中的过滤效果.PowerDrill 会使用生产系统的统计数据将经常访问的列和那些列中经常读取的数据块分割在两个子区间中,一个放在内存,一个放在磁盘.
  3. 根据 PowerDrill 的实验结果,Skip List 平均会过滤掉 92.41% 的数据,这部分过滤掉的数据对单个查询的影响并不是主要因素,但是对整个集群的吞吐能力却有非常大的提升.
  4. 它是在大数据块的连续磁盘空间上进行过滤,避免了磁盘随机读取的高延迟消耗.
  5. 数据是随机分割进某一区域块的,避免了少数几个区域在数据实时装载,查询,负载平衡的过程中形成热点区域.
  6. 它的大小与延迟物化的性能有关系.

延迟物化 (later materialization)

在 2006 年的” Integrating Compression and Execution in ColumnOriented Database Systems” 论文中,描述了如何直接在已经编码的数据上不解码直接进行操作,每个不同的 encoding 的处理方式都不太一样,这里介绍 Dictionary Encoding 的方式: 在数据排序之后将所有出现的字符写入一个前缀的字典表,然后在实际的数值上用字典表中的数字代替,比如”家用电器电饭煲”代表 1(假设为 16 个 bit 的 short 类型),”手机数码数码相机”代表 2. 在计算这些商品销售的个数的时候,查询引擎只是简单的将字典代号中的个数相加, 这与使用 hashmap 或类似数据结构的处理方式完全不同,假设使用 hashmap 处理,会首先判断”家用电器电饭煲”这个元素在不在 hashmap 中,如果在则将其对应的计数器加一,如果不在则将这个元素放入 hashmap 中,然后将这个元素对应的计数器赋值为 1. 根据 PowerDrill 的实验数据,即使将所有的数据都放入内存中,主要由于延迟物化的特性,PowerDrill 也会比行处理速度或者 Dremel 的处理速度快上 2 到 3 个数量级.

延迟物化能极大的提升处理速度的原因在于它是缓存感知的 (Cache Conscious), 在处理单个 chunk 的数据时 (一个 chunk 一般包含几万到几十万不等的同类型单元数值),所有的数据都是在 CPU L2 Cache 里面的,不会产生 cpu cache miss 的情况,这与 hashmap 的处理方式完全不同,hashmap 大多数情况无法将所有元素放入 CPU L2 Cache 中 (现在 cpu 的 L2 cache 大小一般为 128K-256K),导致 cpu cache miss 非常高. 另外由于 chunk 的数据大部分都是排序过的, 所以实际的计数操作会更快.

为了实现延迟物化, 必须对单个 chunk 的大小有所限制,所以对应的 skip list 数据结构必须尽量平分所有的 chunk 块, 由于所有数据本身已经是经过平分的,所以这一点不难做到 (相对于物理分区当遇到数据倾斜的情况 chunk 大小就无法保证了), 最大的 chunk 所包含的单元值不能超过 CPU L2 Cache 的值,比如 CPU L2 Cache 的大小是 256K. 如果你的前缀字典压缩表使用的是 Integer 类型表示实际数值,那么单个 chunk 包含的单元值个数最好不要超过 5W 个,如果数据块中某一列的唯一值个数不超过 65536 个,那么一般使用 2 个 byte 大小的 short 类型来表示前缀压缩字典表中的实际数值, 单个 chunk 的大小就最好不要超过 10W 个. 不同的 encoding 会有不同的要求,但是总体原则是不能超过 L2 Cache 的大小.

这里描述的是最简单的延迟物化, 对于不同的 encoding 和 decoding, 延迟物化都需要有对应的 API 接口, 对应不同的操作类型比如 sum,count,avg 也需要有对应的 API 接口 , 所以延迟物化的实现都很复杂, 当数据不能使用对应的延迟物化特性时,查询引擎必须先将数据解码才能完成计算,这种情况一般发生在使用 UDF 或者需要同另外一个没有对应 encoding 的表做 join 的情况下. 如果延迟物化在一个查询的全部过程中都可以使用,一般的解码动作会发生在执行计划的最后一步,对于中间步骤比如 group by,union,join 数据会一直保持编码的格式,从而减少内存的需求和提升查询的速度.

对 Hadoop 的影响

目前的大数据分析 hadoop 已经成为了事实上的工业标准, 列数据库的相关研究无疑会对未来 hadoop 的各个方面产生重大影响. 比如未来的硬件会以更大容量的机械磁盘为主,SSD 不会对需要扫描大量数据的查询分析有任何帮助,更大的内存也会提升系统的性能和吞吐能力. 在笔者的上一篇文章中已经提到的 hadoop 中真正的列文件格式 [1] 应该具有的特性, 目前最好的列文件格式是正在开发中的 Trevni [11]. 文件格式对目前 NameNode 和 JobTracker/ResourceManager 的内存瓶颈也会有极大的帮助.Trevni 的说明书上 [12] 写着希望 Trevni 的 block size 为 1GB, 在未来如果实现了 skip list 数据结构,Trevni 的 block size 会更大. 这会帮助目前的 NameNode 至少将内存的需求降低一个数量级以上,并且集群的吞吐能力和计算能力都会得到非常大的提升. 最近公布的 Cloudera Impala 和 Apache Drill 作为 Google Dremel 项目的克隆都受到了极大的关注,不久的未来 Trevni 会作为高性能的文件格式集成进 hadoop 生态圈中. 同时目前最为接近 Google PowerDrill 目标的开源项目则是 Spark 和 Shark, 一个由 UC Berkeley 实验室开发的开源项目 [4][5], 它拥有不同于 map reduce 的执行引擎和一个主要基于内存的计算模型,在不久前发布的版本中,Shark 已经具备一个放在 JVM 堆外的内存列数据结构,这比目前 Hive 或者 Java MapReduce 所使用的内存数据结构要小的多,目前 Berkeley 和 Yahoo 的开发人员还在开发内存中的列压缩数据结构,这会进一步减少内存的占用空间和 gc 时间,也为未来实现延迟物化奠定基础,更多信息可以参考 [13][14]. 有兴趣的同学不妨多关注这几个项目.

参考资料

[1] 浅析 Hadoop 文件格式 http://www.infoq.com/cn/articles/hadoop-file-format

[1]Processing a Trillion Cells per Mouse Click http://vldb.org/pvldb/vol5/p1436alexanderhallvldb2012.pdf

[2] http://www.dbms2.com/2011/02/06/columnar-compression-database-storage/

[3] http://www.dbms2.com/2012/12/02/are-column-stores-really-better-at-compression/

[4] UC Berkeley 的 spark 项目 http://spark-project.org/

[5] UC Berkeley 的 shark 项目http://shark.cs.berkeley.edu/

[6] Reordering Columns for Smaller Indexes http://arxiv.org/abs/0909.1346

[7] Reordering Rows for Better Compression: Beyond the Lexicographic Order http://arxiv.org/abs/1207.2189

[8] C-store: A column-oriented dbms. In VLDB 2005

[9] IBM Column-Oriented Storage Techniques for MapReduce http://pages.cs.wisc.edu/~jignesh/publ/colMR.pdf

[10] Integrating Compression and Execution in ColumnOriented Database Systems

[11] Trevni 文件格式 https://issues.apache.org/jira/browse/AVRO-806

[12] Trevni Spec http://avro.apache.org/docs/1.7.3/trevni/spec.html

[13] http://www.dbms2.com/2012/12/13/introduction-to-spark-shark-bdas-and-amplab/

[14] http://www.dbms2.com/2012/12/13/spark-shark-and-rdds-technology-notes/

作者简介:

江志伟,盛大资深数据工程师,长期关注分析型 MPP 数据库和 Hadoop,喜欢收集各种商业智能平台和数据仓库厂商八卦,热爱各种性能和算法小技巧,偶有所学都记录在个人博客http://www.gemini5201314.net/ . 明年打算使用自己拙劣的编程技巧为列文件格式献上自己一份力.