利用 In-Database Analytics 技术在大规模数据上实现机器学习的 SGD 算法

阅读数:2906 2014 年 3 月 14 日 00:50

随着应用数据的增长,在大规模数据集上进行统计分析和机器学习越来越成为一个巨大的挑战。目前,适用于统计分析 / 机器学习的语言 / 库有很多,如专为数据分析用途而设计的 R 语言,Python 语言的机器学习库 Scikits,支持分布式环境扩展的有基于 Map-Reduce 实现的 Mahout,以及分布式内存计算框架 Spark 上的机器学习库 MLlib 等等。目前 Spark 框架也推出了 R 语言的接口 SprakR。但是,本文要讨论的,则是另外一种设计思路,在 database 中实现统计分析和机器学习算法,即 In-Database Analysis,Madlib 库就是这种设计思路的代表。

把机器学习库内置到 database 中(通过 database 的 UDF)有许多优点,执行机器学习算法时只需要编写相应的 SQL 语句就可以了,同时 database 本身作为分析的数据源,使用非常方便,大大降低了机器学习的应用门槛。当然缺点也是明显的,由于受限于 database 提供的 UDF 编程接口,实现算法时会受到很多限制,很多优化难以实现,而大规模数据集上的机器学习,尤其是需要迭代计算的,通常对算法性能和结果收敛速度要求较高,否则很难做到实用。本文的重点就是讨论如何在 In-database Analysis 的框架下,高效的实现机器学习中的 SGD(随机梯度下降)算法。由于很多机器学习算法如 linear SVM 分类器、K-mean、 Logistic Regression 都可以采用 SGD 算法来实现,只需要针对不同算法设计不同的目标函数即可。因此在 database 上实现高性能的 SGD 算法框架,便可用来执行一大类机器学习算法。

以 Madlib 为例,如果要 Madlib 用 SVM 算法对数据集做训练,可以执行如下 SQL 语句:

SELECT madlib.lsvm_classification( 'my_schema.my_train_data', 
                                   'myexpc', 
                                   false
                                );

madlib.lsvm_classification 是 Madlib 中实现的 SVM 计算函数,上述方式的调用则是采用 SGD 算法进行 linear SVM 分类,其中 my_schema.my_train_data 是训练数据表,必须满足如下结构定义:

TABLE/VIEW my_schema.my_train_table 
(       
        id    INT,       -- point ID
        ind   FLOAT8[],  -- data point
        label  FLOAT8   -- label of data point,即分类结果 
);

执行之后生成的 Model 将会被存到第二个参数’myexpec’指定的表中。第三个参数 (true/false) 指定算法是否需要并行执行。

生成 Model 表 myexpec 后,执行以下 SQL 语句就可以进行预测:

SELECT madlib.lsvm_predict( 'myexpc', 
                            '{10,-2,4,20,10}'
                          );

madlib.lsvm_classification 函数还有更多的参数可以设置。如可以设置 kernel-function,那么这时候计算实现的方式就不是 SGD 了,还可以设置迭代精度阈值等,具体参见 Madlib 文档。

从这个例子可以看出,Madlib 库的使用是非常方便的,只要我们按照库函数要求建好相应的数据表并导入数据,就可以通过调用 Madlib 库中的 SQL 函数来进行模型训练和模型预测了。目前 Madlib 支持 PostgreSQL、Greenplum、Pivotal HAWQ。

但是,在很多情形下,Madlib 的执行性能是很差的,以基于 SGD 算法实现的机器学习训练模型为例,在数据量大的情况下经常需要几十小时甚至几百小时来完成,为了提高计算速度和模型收敛速度,有两个非常值得优化的地方:一是改为并发更新模型,二是优化数据读取顺序。

首先看基本的 SGD 算法框架在 database 的 UDFA(自定义聚合函数)上的实现。以 linear SVM 为例,可以定义如下目标函数 f(x):

其中 <xi, yi> 就是训练数据,w 是训练模型。如下的梯度下降算法通过迭代找出使这个目标函数值尽可能小的 w:

其中是步长值。

但是直接按照上式计算,每迭代一次,就需要遍历所有的数据,难以在实际场景中应用,因此就有了 SGD(随机梯度下降)算法,对每一步迭代中的 f(x),近似的用随机的一个数据点来代替原有 f(x) 中对所有数据点的求和:

在原始数据顺序随机的情况下,每次迭代只需要顺次取出一条记录并按上式进行迭代即可。这种迭代更新的计算方式,和 database 中提供的 UDFA 扩展接口是非常吻合的。可以把这个计算过程抽象为三个函数:

initialize(state)
transition(state, row_data)
terminate(state)

state 用于存储这个 UDFA 计算上下文,对应 SDG 算法,state 存的就是不停被迭代更新的 model。row_data 是每次读入的一行记录,即一个数据点。terminate 函数返回最终的计算结果。具体这三个函数调用的次序如下图所示:

在 PostgreSQL 数据库中,这三个函数分别被命名为 initcond、sfunc 和 finalfunc。

在训练数据表上执行一次这样的 UDFA,相当于顺次用表中所有的数据按照上述的迭代式计算了一遍。通常要对总体数据迭代多遍才能得到想要精度的结果,因此 Madlib 库中那些训练函数实际上是用 SQL 脚本语言编写的函数,其中包含了对整个表数据的多轮迭代,直到获得所需精度的结果。如果数据表的数据顺序是有序的,那么顺次迭代将大大降低算法的收敛速度,这时可以先将数据表的数据顺序打乱再执行上述算法,如在 PostgreSQL 中,可以先用 order by random() 打乱数据顺序,然后再进行迭代计算。

上述的 UDFA 执行过程存在着两大缺陷:1. 没有并行化。2. 在数据量大的情况下,将数据次序打乱可能是一个非常大的开销。下面先讨论并行化的改进方案:

一个简单的并行化计算方法是,将表数据分为多个部分,每个部分有自己的 model state,每个部分的 model 各自进行各自的迭代计算,当各个 model 都计算完毕时再进行合并。简单的合并方案就是将所有的 model 做平均值得到最终的 model。为什么 SDG 算法可以用这种方法拆分计算然后取 model 平均值,可以参见这篇论文 [1]。当然,这要求 database 对 UDFA 提供一个 merge(state, state) 的函数接口让用户来自定义合并方式。这种简单的并行化方式不仅可以应用在单机 database 上,也可以用在数据分布到多个结点上的分布式 database/datawarehouse 上,如 Cloudera Impala。但是这种简单的 share nothing 的并行计算方案通常会极大的影响 model 的收敛速度,因此还需要寻找更好的并行执行方案。

实现不影响收敛速度的并行执行,就需要所有在不同区段数据上并发执行的 UDFA 去更新同一个 model,这就要 database 提供 Share-Memory 的接口以便于 UDFA 访问共享内存。像 PostgreSQL 就提供了相应的接口 [3]。

但是这种并发更新 model 的方式又带来了另外一个矛盾,在用迭代式更新 model 时必须对 model 加锁,假设多个 UDFA 计算线程同时工作,在同一时刻显然只有一个线程能获得锁进行计算并更新模型,本质上还是等同于单线程的计算效率。要解决这个矛盾,可以采用论文 [2] 中提出的 lock-free parallelizing SGD 算法。这个算法避免了更新 model 时对 model 加锁,核心思想是通过稀疏化 model,使得每次迭代只需要更新 model 中的很少一部分分量,然后证明在 lock-free 方式下并发更新,收敛速度有一个不错的下限。

第二个问题是如何避免大数据量情况下打乱数据次序的开销,因为如果原始数据是有序的话,可能对 model 收敛速度有很大影响,但是打乱大量数据通常又要耗费大量开销。一个简单的方法是,对所有表数据做 reservior sampling,这是一个经典的算法,通过一遍扫描原始数据集进行等概率抽样。然后对抽样数据打乱次序后在抽样数据上进行迭代。当然这种算法缺点也很明显,大量有效的数据点被舍弃了,对训练结果的准确性影响很大。为了避免这个缺陷,可以把数据抽样和按数据顺序迭代相结合,即如下的 multiplexed reservior sampling:

如上图所示,设立两个工作线程并发更新 Model,一个是 I/O 线程,分配好 buffer A;另一个是 Memory 线程,分配好 buffer B。

第一轮迭代,只有 I/O 线程工作,假设 buffer A 的容量为 M,总记录数为 N,遍历每一条记录,前 M 条记录直接填入 buffer A。从 M+1 条记录开始,假设当前为第 i 条记录,第 i 条记录有 M/i 的概率被选中,如果被选中那么替换掉 buffer A 中的随机一条记录,替换出的记录被丢弃,否则直接丢弃第 i 条记录。这时候用被丢弃的记录更新模型,直到所有记录遍历完成。

第一轮迭代后交换 buffer A 和 buffer B。第二轮以后的迭代都是 I/O 线程和 Memory 线程同时工作。Memory 线程负责循环读取出 buffer B 中的数据并用来更新 model,I/O 线程的工作和第一轮的工作方式相同。每一轮迭代过后交换 buffer 使得 memory 线程可以用到上一轮 I/O 线程采样出的数据。

基于 Hadoop 的分布式 SQL 查询引擎 Cloudera Impala 也集成了 Madlib 库,当然现在还很不成熟,感兴趣的读者可以点击:

http://blog.cloudera.com/blog/2013/10/how-to-use-madlib-pre-built-analytic-functions-with-impala/

这个项目的 Github 地址在:

https://github.com/cloudera/madlibport

这个项目把 Madlib 库和 Bismark 计算框架移植到了 Impala 上,当然好处是可以利用 Impala 的分布式处理能力,至于性能和成熟度,目前还无法期待太多,毕竟 Imapla 也才刚刚在 1.2 的版本中加入了 UDFA 的编程接口,还有太多局限的地方。

参考文献

[1]M.Zinkevich,M.Weimer,A.Smola,and L.Li.Parallelized Stochastic Gradient Descent.In NIPS,2010.

[2]F.Niu,B.Recht,C.R e,and S.Wright.Hogwild:A Lock-Free Approach to Parallelizing Stochastic Gradient Descent.In NIPS,2011.

[3] http://www.postgresql.org/docs/9.3/static/xfunc-c.html#AEN53973

作者

梁堰波,北京航空航天大学计算机硕士,美团网资深工程师,曾在法国电信、百度和 VMware 工作和实习,这几年一直在折腾 Hadoop/HBase/Impala 和数据挖掘相关的东西,新浪微博 @DataScientist。

徐伟辰,北京航空航天大学计算机硕士,IBM 中国软件开发中心研发工程师,参与开发过大数据分析平台,PaaS 平台,熟悉 Hadoop 相关技术,目前的兴趣点在大数据分析技术及相关算法和工具。


感谢吴甘沙对本文的审校。

给 InfoQ 中文站投稿或者参与内容翻译工作,请邮件至 editors@cn.infoq.com 。也欢迎大家通过新浪微博( @InfoQ )或者腾讯微博( @InfoQ )关注我们,并与我们的编辑和其他读者朋友交流。

评论

发布