解读 2015 之 Spark 篇:新生态系统的形成

阅读数:8261 2016 年 1 月 6 日 22:00

编者按:2015 年,整个 IT 技术领域发生了许多深刻而又复杂的变化,InfoQ 策划了“解读2015 ”年终技术盘点系列文章,希望能够给读者清晰地梳理出技术领域在这一年的发展变化,回顾过去,继续前行。本文是大数据解读2015 之Spark 篇,明略数据的梁堰波为大家解读Spark 在2015 年的快速发展,后续InfoQ 会有更多关于大数据生态技术的总结。

2015 年的 Spark 社区的进展实在是太快了,我发现 1 月份出版的一本参考书到现在已经有很多内容是过时的了。社区大踏步前行的同时,用户和应用案例也越来越多,应用行业越来越广泛。到年底了我们来梳理下 Spark 这快速发展的一年。

先从全局有个认识,我尝试用三句话来概括下 Spark 最主要的变化,然后在接下来的篇幅选取一些重点内容展开。

  • Spark 生态系统渐趋完善。支持的外部数据源越来越多,支持的算子越来越丰富,自身的机器学习算法越来越完善。同时在 API 支持上也有很大进步,新增加的 R 语言 API 使得 Spark 能被更多的行业所接受。
  • Spark 的应用范围和规模在不断扩大。在互联网和电子商务行业的应用不断增多、规模不断扩大的基础上,越来越多的金融、电信、制造业等传统行业也开始使用 Spark 解决他们遇到的大数据问题。
  • Spark 自身的性能和稳定性在不断提升。Tungsten 项目让 Spark 跑的越来越快,越来越多的代码贡献者和使用经验让 Spark 越来越稳定。相信明年发布的 Spark 2.0 将是一个里程碑式的版本。

转向以 DataFrame 为核心

在传统意义上 Spark 的核心是 RDD 和 RDD 之上的各种 transformation 和 action,也就是各种算子,RDD 可以认为是分布式的 Java 对象的集合。2013 年推出了 DataFrame,可以看做分布式的 Row 对象的集合。DataFrame 除了提供了比 RDD 更丰富的算子以外,更重要的特点就是有执行计划的优化器,这样用户只需要指定自己的操作逻辑,DataFrame 的优化器会帮助用户选择一条效率最优的执行路径。同时 Tungsten 优化(下一章重点讲)使得 DataFrame 的存储和计算效率比 RDD 高很多。Spark 的机器学习项目 MLlib 的 ML pipeline 就是完全基于 DataFrame 的,而且未来 Streaming 也会以 DataFrame 为核心。

 解读2015之Spark篇:新生态系统的形成

(图片引自 Databricks)

Tungsten 让 Spark 越来越快

那么为什么 DataFrame 比 RDD 在存储和计算上的效率更高呢?这主要得益于 Tungsten 项目。Tungsten 做的优化概括起来说就是由 Spark 自己来管理内存而不是使用 JVM,这样可以避免 JVM GC 带来的性能损失;内存中的 Java 对象被存储成 Spark 自己的二进制格式,更加紧凑,节省内存空间,而且能更好的估计数据量大小和内存使用情况;计算直接发生在二进制格式上,省去了序列化和反序列化时间。

像传统的 Hadoop/Hive 系统,磁盘 IO 是一个很大的瓶颈。而对于像 Spark 这样的计算框架,主要的瓶颈在于 CPU 和内存。下面看看 Tungsten 主要做了哪些优化:

1,基于 JVM 的语言带来的问题:GC 问题和 Java 对象的内存开销。例如一个字符串”abcd”理论上只有 4 个 bytes,但是用 Java String 类型来存储却需要 48 个 bytes。Spark 的改进就是自己管理内存,不用 JVM 来管理了,使用的工具是 sun.misc.Unsafe。DataFrame 的每一行就是一个 UnsafeRow,这块内存存的啥东西只有 Spark 自己能读懂。有了这种特有的二进制存储格式后,DataFrame 的算子直接操控二进制数据,同时又省去了很多序列化和反序列化的开销。

2,Cache-aware 的计算。现在 Spark 已经是内存计算引擎了,但是能不能更进一步呢,能不能更好的利用 CPU 的 L1/L2/L3 缓存的优势呢,因为 CPU 缓存的访问效率更高。这个优化点也不是意淫出来的,是在 profile 了很多 Spark 应用之后得到的结论,发现很多 CPU 的时间浪费在等待从内存中取数据的过程。所以在 Tungsten 中就设计和实现了一系列的 cache-friendly 的算法和数据结构来加速这个过程,例如 aggregations, joins 和 shuffle 操作中进行快速排序和 hash 操作。

以 sort 为例,Spark 已经实现了 cache-aware 的 sort 算法,比原来的性能提升至少有 3 倍。在传统的排序中是通过指针来索引数据的,但是缺点就是 CPU cache 命中率不够高,因为我们需要随机访问 record 做比较。实际上 quicksort 算法是能够非常好的利用 cache 的,主要是我们的 record 不是连续存储的。Spark 的优化就是存储一个 key prefix 和指针在一起,那么就可以通过比较 key prefix 来直接实现排序,这样 CPU cache 的命中率就会高很多。例如如果我们需要排序的列是一个 string 类型,那么我们可以拿这个 string 的 UTF-8 编码的前 8 个字节来做 key prefix,并进行排序。

解读2015之Spark篇:新生态系统的形成 

关于这个优化可以参见 SPARK-9457 和 org.apache.spark.shuffle.sort 下面的类,最重要的是 ShuffleExternalSorter 和 ShuffleInMemorySorter 两个类。

3,运行时代码生成

运行时代码生成能免去昂贵的虚函数调用,同时也省去了对 Java 基本类型装箱之类的操作了。Spark SQL 将运行时代码生成用于表达式的求值,效果显著。

除了这些优化,我认为还有两个很重要的变化:

1,Unified Memory Management

在以前 Spark 的内存显式的被分为三部分:execution,storage 和其他。execution 内存用于 shuffle, join, sort 和 aggregation 等操作,而 storage 内存主要用于 cache 数据。在 1.6 版本之前是通过 spark.shuffle.memoryFraction 和 spark.storage.memoryFraction 两个参数来配置用于 execution 和 storage 的内存份额。从 1.6 开始这两部分内存合在一起统一管理了,也就是说如果现在没有 execution 的需要,那么所有的内存都可以给 storage 用,反过来也是一样的。同时 execution 可以 evict storage 的部分内存,但是反过来不行。在新的内存管理框架上使用两个参数来控制 spark.memory.fraction 和 spark.memory.storageFraction。

2,Adaptive query execution

这个特性说大了就是所有数据库最核心的一个功能 query execution optimization,可以做的东西非常多。我们自己写 Spark 程序中经常会碰到一个 job 跑到最后每个分区的数据量很小的情况,这是因为以前的 Spark 不会估计下游 RDD 的每个分区的数据量大小,并根据数据量大小来调整分区个数。以前遇到这种问题就需要手工 repartition,用户自己要心里有数到哪个阶段的 RDD 的 partition 数据变多了还是变少了,需要跟着调整分区的数目,非常不灵活。从 1.6 版本开始有了部分支持,主要是能够估计在 join 和 aggregate 操作中 Shuffle 之后的分区的数目,动态调整下游 task 的数目,从而提高执行效率。

DataFrame 和 SQL API

Spark 从 API 的角度看,可以分为两大类:

  • 类似于 Python 的 Pandas 和 R 语言的 DataFrame API,用户可以使用 Scala/Java/Python/R 四种语言调用这个 API 处理数据;
  • SQL 语言 API。又分为两种:一个是普通的 Spark SQL,一种是 Hive SQL。

虽然 API 不同,但是背后解析出来的算子是一样的,DataFrame 的各种算子其实就是各种 SQL 的语法。Spark 在 SQL 语法的支持越来越丰富的同时内置的 SQL 函数得到了很大的增强,目前已经有超过 100 个这样的常用函数 (string, math, date, time, type conversion, condition),可以说最常见的 SQL 内置函数都有了。

作为一个类 SQL 的分析工具,聚合函数是非常核心的。Spark 1.5 和 1.6 在聚合函数上都有很大改进:实现了一个新的聚合函数接口,支持了一些 build-in 的聚合函数(例如 max/min/count/sum/avg/first/corr/stddev/variance/skewness/kurtosis 以及一些窗口函数等),同时基于新接口实现了相应的 UDAF 接口。新的聚合函数接口是 AggregateFunction,有两种具体的实现:ImperativeAggregate 和 DeclarativeAggregate。ImperativeAggregate 类型的聚合操作就是通过用户定义三个动作 initialize/update/merge 的逻辑来实现聚合的;而 DeclarativeAggregate 则是通过指定 initialValues/updateExpressions/mergeExpressions 这三个表达式然后通过代码生成的方式来做聚合的操作。这两种方式各有利弊,一般来说代码生成效率更高,但是像 variance/stddev/skewness/kurtosis 这样的多个表达式需要依赖同一个中间表达式的场景下,代码生成的执行路径由于不能共享中间的结果,从而导致其不如 ImperativeAggregate 效率更高,所以在 Spark 内部的实现中这几个聚合函数也是通过 ImperativeAggregate 来实现的。

SQL API 上另一个变化是可以直接在文件上进行 SQL 操作,不需要把这个文件注册成一个 table。例如支持”select a, b from json.`path/to/json/files`”这样的语法,这个应该是从 Apache Drill 借鉴过来的。

另外一个里程碑式的特性就是 Dataset API(SPARK-9999)。Dataset 可以认为是 DataFrame 的一个特例,主要区别是 Dataset 每一个 record 存储的是一个强类型值而不是一个 Row。这个强类型的值是以编码的二进制形式被存储的,这种存储格式可以不用反序列化就直接可以被上面的算子(例如 sort,Shuffle 等)操作。所以在创建 Dataset 的时候需要指定用于这个编码工作的 Encoder。

这样一些需要强类型的地方就可以使用 Dataset API,不失 DataFrame 的那些优点,同时又可以帮我们做类型检查。所以从某种角度上说这个 Dataset API 在将来是要替换掉 RDD 的。

外部数据源和 Hive 支持

这个 feature 可以说是建立起 Spark 生态系统的基础,使得 Spark 与大数据生态圈的其他组件联系起来了。可以这么理解,你无论数据是在 HDFS 上,还是在 Cassandra 里面,抑或关系型数据库里面,我 Spark 都可以拿过来做分析和处理,或者机器学习,我这边处理完了你让我写到哪去我就可以写出去。这个特性使得 Spark 成为了大数据处理的核心一环。目前 Spark 支持的外部数据源有很多种,主流的像 Parquet,JSON,JDBC,ORC,AVRO,HBase,Cassandra,AWS S3,AWS Redshift 等。 

在这些外部数据源中,Parquet 是其中最核心的,Spark 的 Parquet 支持也有了很大的改进:修复了越来越多的 bug,Parquet 的版本升级到 1.7;更快的 metadata discovery 和 schema merging;能够读取其他工具或者库生成的非标准的 parquet 文件;以及更快更鲁棒的动态分区插入;对于 flat schema 的 Parquet 格式的数据的读性能提升了大约 1 倍(SPARK-11787)。

另外在 Hive 支持方面,越来越多的 Hive 特有的 SQL 语法被加入到 Spark 中,例如 DISTRIBUTE BY... SORT 等。支持连接 Hive 1.2 版本的 metastore,同时支持 metastore partition pruning(通过 spark.sql.hive.metastorePartitionPruning=true 开启,默认为 false)。因为很多公司的 Hive 集群都升级到了 1.2 以上,那么这个改进对于需要访问 Hive 元数据的 Spark 集群来说非常重要。

机器学习算法

Spark 在机器学习方面的发展很快,目前已经支持了主流的统计和机器学习算法。虽然和单机的机器学习库相比 MLlib 还有一定的差距;但是纵观所有基于分布式架构的开源机器学习库,MLlib 是我认为的计算效率最高的。下面列出了目前 MLlib 支持的主要的机器学习算法:

Discrete

Continous

Supervised

Classification

LogisticRegression(with Elastic-Net)

SVM

DecisionTree

RandomForest

GBT

NaiveBayes

MultilayerPerceptron

OneVsRest

Regression

LinearRegression(with Elastic-Net)

DecisionTree

RandomForest

GBT

AFTSurvivalRegression

IsotonicRegression

Unsupervised

Clustering

KMeans

GaussianMixture

LDA

PowerIterationClustering

BisectingKMeans

Dimensionality Reduction, matrix factorization

PCA

SVD

ALS

WLS

下面简单说下其中一些亮点:

1,MLlib 已经有了对 Generialized Linear Model(GLM) 的初步支持:GLM 是统计学里一系列应用非常广泛的模型,指定不同的 family 可以得到不同的模型。例如“Gaussian” family 相当于 LinearRegression 模型, “Bionomial” family 相当于 LogisticRegression 模型,“Poisson” family 相当于 SurvivalRegression 模型。目前 MLlib 已经提供了这三种模型的机器学习解法和 LinearRegression 的 normal equation 解法。

下面详细说说这两种解法:一种是利用 WeightedLeastSquares(WLS) 优化方法;另一种是利用 L-BFGS 优化方法。前者是通过解 normal equation 的思路求解,只需要对所有的数据过一遍(不需要迭代)即可得到最后的模型(好像很神奇),同时可以算出像 coefficients standard errors, p-value, t-value 等统计指标帮助用户理解模型,这种解法是目前 LinearRegression 的默认解法。不过有个限制就是样本 feature 的维度不能超过 4096,但对样本数目没有限制。后者就是传统机器学习的解法,是通过迭代寻找最优解的方法。

而且目前 MLlib 也在进一步研究 IterativelyReweightedLeastSquares(IRLS) 算法,然后结合 WLS 和 IRLS 就可以使 Spark 支持类似 R GLM 的大多数功能。这样对于前面所有的三种模型都将提供两种解法,用户可以根据实际情况选择合适的解法。这个全部做完预计要在下一个版本 Spark 2.0 了。

2,ALS 算法可以说是目前 MLlib 被应用最多的算法,ALS 的数学原理其实比较容易理解,但是如何在分布式系统中高效实现是一个比较复杂的问题。MLlib 里使用分块计算的思路,合理的设计数据分区和 RDD 缓存来减少数据交换,有效的降低了通信复杂度。这个算法的实现思路充分说明了一个问题:同样的算法,在分布式系统上实现时,不同的选择会带来性能上巨大的差异。

3,目前的主要的分类模型都支持使用 predictRaw, predictProbability, predict 分别可以得到原始预测、概率预测和最后的分类预测。同时这些分类模型也支持通过设置 thresholds 指定各个类的阈值。不过目前这些预测函数还只支持批量预测,也就是对一个 DataFrame 进行预测,不支持对单个 instance 进行预测。不过单个 instance 的预测的支持也已经在 roadmap 中了。

4,RandomForestClassificationModel 和 RandomForestRegressionModel 模型都支持输出 feature importance

5,GMM EM 算法实现了当 feature 维度或者 cluster 数目比较大的时候的分布式矩阵求逆计算。实验表明当 feature 维度 >30,cluster 数目 >10 的时候,这个优化性能提升明显。

6,在深度学习方面,MLlib 已经实现了神经网络算法 MultilayerPerceptronClassifier(MLPC)。 这是一个基于前馈神经网络的分类器,它是一种在输入层与输出层之间含有一层或多层隐含结点的具有正向传播机制的神经网络模型,中间的节点使用sigmoid (logistic) 函数,输出层的节点使用softmax 函数。输出层的节点的数目表示分类器有几类。MLPC 学习过程中使用 BP 算法,优化问题抽象成 logistic loss function 并使用 L-BFGS 进行优化。

7,除了我们经常说的机器学习和数据挖掘算法,MLlib 在统计检验算法上也有很大的进步。例如 A/B test Kolmogorov–Smirnov  检验等。A/B 测试可以说是很多大数据应用的基础,很多结论最终都是通过 A/B 测试得到的。

机器学习 Pipeline

MLlib 最大的变化就是从一个机器学习的 library 开始转向构建一个机器学习工作流的系统,这些变化发生在 ML 包里面。MLlib 模块下现在有两个包:MLlib 和 ML。ML 把整个机器学习的过程抽象成 Pipeline ,一个 Pipeline 是由多个 Stage 组成,每个 Stage 是 Transformer 或者 Estimator。

以前机器学习工程师要花费大量时间在 training model 之前的 feature 的抽取、转换等准备工作。ML 提供了多个 Transformer,极大提高了这些工作的效率。在 1.6 版本之后,已经有了 30+ 个 feature transformer,像 OneHotEncoder, StringIndexer, PCA, VectorSlicer, VectorAssembler, SQLTransformer, HashingTF, IDF, Word2Vec 等。这些工具使得各种 feature 提取、转换等准备工作。

原始数据经过上面的各种 feature transformer 之后就可以丢到算法里去训练模型了,这些算法叫 Estimator,得到的模型是 Transformer。上一章节已经提到了主流的算法支持,所以现在可以得到模型了。得到的模型怎么评价好坏呢?MLlib 提供了像 BinaryClassificationEvaluator 等一系列的 evaluation 工具,这些工具里面定义了像 AUC 等一系列的指标用于评价模型的好坏。

这样一个机器学习的 pipeline 就可以跑起来了,MLlib 进一步提供了像 CrossValidator 这样的工具帮助我们做模型和 pipeline 的调优,选出最佳的模型参数等。另外一个重要特性就是 pipeline 的持久化。现在的 ML pipeline 和大多数的 Transformers/Estimators 都支持了持久化,可以 save/load。这样就可以把模型或者 pipeline 存储、导出到其他应用,扫除了 MLlib 在生产环境应用的最后一个障碍。

另外一个显著变化就是 ML 框架下所有的数据源都是基于 DataFrame,所有的模型也尽量都基于 Spark 的数据类型 (Vector, Matrix) 表示。在 ML 里面的 public API 下基本上看不到对 RDD 的直接操作了,这也与前面讲的 Spark 的未来变化是一致的。

Python 是机器学习领域最流行的语言,ML/MLlib 的 Python API 也在不断加强,越来越多的算法和功能的 Python API 基本上与 Scala API 对等了。

SparkR

R 语言在统计领域应用非常广泛,R 语言本身的架构可以比较容易支持其他执行后端。SparkR 就是把 Spark 作为 R 语言的执行后端。目前 SparkR 提供的接口包括了 DataFrame 的绝大多数函数以及机器学习 GLM 函数,未来还会支持更多的功能。SparkR 既可以利用 R 接口的易用性以及在传统行业的既有市场空间,又可以利用 Spark 强大的分布式数据处理能力。SparkR 在推出不久就获得了很高的用户关注度和使用率。笔者在和一些传统行业的大数据从业者交流中经常会被问到这样的问题:我以前用 R 写的程序,现在数据量大了,传统 R 跑不动了,能不能直接放到 Spark 上跑。相信 SparkR 就是在朝着解决这个问题的方向努力。

在统计和机器学习方面一个重要的 feature 就是 R formula 的支持,R 用户可以用他们非常熟悉的 formula 来定义一个 GLM 模型。目前已经支持最基本的'.', '~', ':', '+'和 '-',未来还会增强这项功能。同时 SparkR 的 GLM 函数不只提供了训练一个 GLM 模型和预测的能力,也提供了对模型的 R 类似的统计指标输出。目前 SparkR 跑出的 GLM 模型和传统 R 跑出的模型的结果是一样的,同时也会输出 coefficients standard errors, p-values, t-values 等统计信息。

Spark 2.0

Spark streaming 等组件在这一年也有很大的变化,笔者由于精力有限,在此不一一列出。Spark 2.0 预计三四月份发布,将会确立以 DataFrame 和 Dataset 为核心的体系架构,RDD 会慢慢退出历史舞台;同时在各方面的性能上会有很大的提升,当然我们也期待着稳定性方面的提升。从 1.x 到 2.x 还会放弃 Hadoop 1.x 的支持,RPC 系统从 Akka 迁移到 Netty 等。快速发展的社区,越来越多的应用,性能和稳定性方面的不断提升使得 Spark 在未来的若干年内还是大数据处理工具的首选。

作者介绍:

梁堰波,明略数据技术合伙人,开源爱好者,Apache Spark 项目核心贡献者。北京航空航天大学计算机硕士,曾就职于 Yahoo!、美团网、法国电信从事机器学习和推荐系统相关的工作,在大数据、机器学习和分布式系统领域具备丰富的项目经验。

评论

发布