NVIDIA 初创加速计划,免费加速您的创业启动 了解详情
写点什么

Analytics Zoo:在 Apache Spark 上实现分布式 Tensorflow 和 BigDL 管道的统一分析和 AI 平台

  • 2018-12-22
  • 本文字数:5824 字

    阅读完需:约 19 分钟

Analytics Zoo:在Apache Spark上实现分布式Tensorflow和BigDL管道的统一分析和AI平台
  • 如今,将深度学习应用于大数据管道往往需要手工“拼接”许多独立的组件(如 TensorFlow、Apache Spark、Apache HDFS 等),这个过程可能非常复杂,而且容易出错。

  • Analytics Zoo提供了一个在 Apache Spark 上实现分布式 TensorFlow、Keras 和 BigDL 管道的统一分析和 AI 平台,简化了这个过程。

  • 它将 Spark、TensorFlow、Keras 和 BigDL 程序无缝地合并到一个集成管道中,可以透明地扩展到大型 Apache Hadoop/Spark 集群,用于分布式训练或推理。

  • 早期用户(如世界银行CrayTalrooBaosight美的/库卡等)已经基于 Analytics Zoo 构建了分析+AI 应用程序,它可以应用于范围广泛的工作负载(包括基于迁移学习的图像分类、用于短时降水预测的 sequence-to-sequence 预测、用于推荐工作的神经协同过滤、无监督时序异常检测等等)。

  • 本文提供了几个具体的教程,介绍如何使用 Analytics Zoo 在 Apache Spark 上实现分布式 TensorFlow 管道,以及在实际的用例中使用 Analytics Zoo 实现端到端的文本分类管道。


人工智能应用程序的不断进步将深度学习带到新一代数据分析开发的前沿。特别是,我们看到越来越多的组织需要将深度学习技术(如计算机视觉、自然语言处理、生成对抗神经网络等)应用到他们的大数据平台和管道。如今,这常常需要手工“拼接”许多独立的组件(例如 Apache Spark、TensorFlow、Caffe、Apache Hadoop 分布式文件系统 HDFS、Apache Storm/Kafka 等),这可能是一个复杂且容易出错的过程。


在英特尔,我们一直与开源社区用户以及京东UCSFMastercard等合作伙伴和客户广泛合作,在 Apache Spark 上构建深度学习(DL)和 AI 应用程序。为了简化端到端的开发和部署,我们开发了Analytics Zoo,这是一个统一的分析+ AI 平台,它将 Spark、TensorFlow、Keras 和 BigDL 程序无缝地合并到一个集成管道中,可以透明地扩展到大型 Apache Hadoop/Spark 集群,用于分布式训练或推理。


早期用户(如世界银行CrayTalrooBaosight美的/库卡等)已经基于 Analytics Zoo 构建了分析+AI 应用程序,它可以应用于范围广泛的工作负载,其中包括基于迁移学习的图像分类、用于短时降水预测的 sequence-to-sequence 预测、用于推荐工作的神经协同过滤、无监督时序异常检测等等。


本文提供了几个具体的教程,介绍如何使用 Analytics Zoo 在 Apache Spark 上实现分布式 TensorFlow 管道,以及在实际的用例中使用 Analytics Zoo 实现端到端的文本分类管道。

Apache Spark 上的分布式 TensorFlow

使用 Analytics Zoo,用户可以方便地使用 Spark 和 TensorFlow 在大型集群上构建端到端的深度学习管道,如下所述。

使用 PySpark 进行数据预处理和分析

举例来说,要使用分布式的方式处理对象检测管道的训练数据,可以使用 PySpark 简单地把原始图像数据读入一个 RDD(弹性分布式数据集),这是一个跨集群分区的不可变记录集合,然后运用一些转换解码图像,并提取边界框和类标签,如下所示。


train_rdd = sc.parallelize(examples_list)  .map(lambda x: read_image_and_label(x))  .map(lambda image: decode_to_ndarrays(image))
复制代码


结果 RDD (train_rdd)中的每条记录都包含一个 NumPy ndrray 列表(即图像、边界框、类和检测到的框的数量),然后可以直接在 Analytics Zoo 上用于 TensorFlow 模型的分布式训练;这是通过从结果 RDD 创建TFDataset来完成的(如下所示)。


dataset = TFDataset.from_rdd(train_rdd,             names=["images", "bbox", "classes", "num_detections"],             shapes=[[300, 300, 3],[None, 4], [None], [1)]],             types=[tf.float32, tf.float32, tf.int32, tf.int32],             batch_size=BATCH_SIZE,             hard_code_batch_size=True)
复制代码

使用 TensorFlow 开发深度学习模型

在 Analytics Zoo 中,TFDataset 表示一个分布式元素集,其中每个元素包含一个或多个 Tensorflow Tensor 对象。然后,我们可以直接使用这些 Tensor(作为输入)来构建 Tensorflow 模型;例如,我们可以使用 Tensorflow Object Detection API 构建一个 SSDLite+MobileNet V2 模型(如下所示)。


# 使用tensorflow对象检测api来构造模型# https://github.com/tensorflow/models/tree/master/research/object_detectionfrom object_detection.builders import model_builder
images, bbox, classes, num_detections = dataset.tensors
detection_model = model_builder.build(model_config, is_training=True)resized_images, true_image_shapes = detection_model.preprocess(images)detection_model.provide_groundtruth(bbox, classes)prediction_dict = detection_model.predict(resized_images, true_image_shapes)losses = detection_model.loss(prediction_dict, true_image_shapes)total_loss = tf.add_n(losses.values())
复制代码

在 Spark 和 BigDL 上进行分布式训练/推理

在构造好模型之后,我们可以直接在 Spark 上(利用 BigDL 框架)以分布式的方式训练模型。例如,在下面的代码片段中,我们应用迁移学习技术来训练一个在 MS COCO 数据集上预训练过的 Tensoflow 模型。


with tf.Session() as sess:    init_from_checkpoint(sess, CHECKPOINT_PATH)    optimizer = TFOptimizer(total_loss, RMSprop(LR), sess)    optimizer.optimize(end_trigger=MaxEpoch(20))    save_to_new_checkpoint(sess, NEW_CHEKCPOINT_PATH)
复制代码


在后台,从磁盘读取输入数据并进行预处理,利用 PySpark 生成 Tensorflow Tensor 的 RDD;然后,在 BigDL 和 Spark(如BigDL技术报告所述)上以分布式的方式对 Tensorflow 模型进行训练。整个训练管道可以自动从单个节点扩展到基于 Xeon 的大规模 Hadoop/Spark 集群(无需修改代码或手动配置)。


另外,模型训练好以后,我们可以使用 PySpark、TensorFlow 和 BigDL(类似于上面的训练管道)在 Analytics Zoo 上执行大规模的分布式评估/推断。或者,我们也可以使用 Analytics Zoo 提供的 POJO 风格的服务API来部署低延迟的在线服务(例如,Web 服务、Apache Storm、Apache Flink 等)模型,如下所示。


AbstractInferenceModel model = new AbstractInferenceModel(){};model.loadTF(modelPath, 0, 0, false);List<List<JTensor>> output = model.predict(inputs);
复制代码


下图显示了 Analytics Zoo 中 Apache Spark 管道上的分布式 TensorFlow 的整个工作流(包括训练、评估/推断和在线服务)。


端到端分析和 AI 管道

Analytics Zoo 还为用户提供了丰富的端到端管道分析和 AI 支持,包括:


  • 易于使用的抽象,如 Spark Dataframe 和 ML 管道支持、迁移学习支持、Keras 风格的 API、POJO 风格的模型服务 API 等等;

  • 面向图象、文本和 3D 图象的常见特征工程操作*;*

  • 内置的深度学习模型,如文本分类、推荐、对象检测、图象分类等;

  • 参考用例,如时间序列异常检测、欺诈检测、图像相似性搜索等。


使用这些高级管道支持,用户可以在几行代码中轻松构建复杂的数据分析和深度学习应用程序,如下所述。

使用 NNImageReader 将图象加载到 Spark DataFrames 中

from zoo.common.nncontext import *
from zoo.pipeline.nnframes import *
sc = init_nncontext()
imageDF = NNImageReader.readImages(image_path, sc)
复制代码

使用 DataFrames 转换处理加载的数据

getName = udf(lambda row: ...)
getLabel = udf(lambda name: ...)
df = imageDF.withColumn("name", getName(col("image"))) \
.withColumn("label", getLabel(col('name')))
复制代码

使用内置的特征工程操作处理图像

from zoo.feature.imageimport *transformer = ChainedPreprocessing(        [RowToImageFeature(), ImageChannelNormalize(123.0, 117.0, 104.0),         ImageMatToTensor(), ImageFeatureToTensor()])
复制代码


使用迁移学习 API 加载已有的 Caffe 模型,删除最后几层,冻结开始几层,追加几个新层(使用 Keras 风格的 API)


from zoo.pipeline.api.netimport *full_model = Net.load_caffe(def_path, model_path)# 删除pool5之后的层Remove layers after pool5model = full_model.new_graph(outputs=["pool5"])# 冻结从输入到res4f 之间的层,包括res4fmodel.freeze_up_to(["res4f"])# 追加几个层image = Input(name="input", shape=(3, 224, 224))resnet = model.to_keras()(image)resnet50 = Flatten()(resnet)
logits = Dense(2)(flatten)
newModel = Model(inputs, logits)
复制代码

使用 Spark ML 管道训练模型

estimater = NNEstimater(newModel, CrossEntropyCriterion(), transformer) \                .setLearningRate(0.003).setBatchSize(40).setMaxEpoch(2) \                .setFeaturesCol("image").setCachingSample(False)nnModel = estimater.fit(df)
复制代码

基于 Analytics Zoo 的真实 AI 案例

如上所述,有许多早期用户已经在 Analytics Zoo 上构建了真实的应用程序。在本节中,我们将更详细地描述如何在Microsoft Azure的 Analytics Zoo 上使用 NLP 技术构建端到端的文本分类管道。

文本分类概述

文本分类是一种常见的自然语言处理任务,其目的是将输入文本语料库分类为一个或多个类别。例如,垃圾邮件检测将电子邮件的内容分为垃圾邮件或非垃圾邮件类别。


一般来说,文本分类模型的训练包括以下步骤:收集和准备训练数据集及验证数据集、数据清理和预处理、训练模型、验证和评估模型、优化模型(包括但不限于添加数据、调整超参数、调整模型)。


Analytics Zoo 中有几个预定义的文本分类器可以开箱即用,即 CNN、LSTM、GRU。我们选择从 CNN 开始。我们在下面的文本中使用 Python API 来说明训练过程。


from zoo.models.textclassificationimport TextClassifiertext_classifier = TextClassifier(class_num, embedding_file, \
sequence_length=500, encoder="cnn", encoder_output_dim=256)
复制代码


在上面的 API 中,class_num 是这个问题中的类别数量,embedding_fileis 是预训练词向量文件的路径(目前只支持 Glove ),sequence_length 是每个文本记录中包含的单词数,encoder 是词编码器的类型(可以是 CNN、LSTM 或 GRU),encoder_output_dim 是这个编码器的输出。该模型接收词索引序列作为输入,输出标签。

数据收集和预处理

训练数据集中的每个记录包含两个字段,一个是 dialogue 和一个是 label。我们收集了数千条这样的记录,并通过手动和半自动的方式收集标签。然后,我们对原始文本进行数据清理,去掉无意义的标记和混淆的部分,并将它们转换为文本 RDD,每个记录的格式为一个(文本、标签)对。接下来,我们对文本 RDD 进行预处理,并输出我们的模型可以接收的正确形式。请确保你的数据清洗和处理对训练和预测都是一样的!


(How to get invoice …, 1)
(Can you send invoice to me…,1)
(Remote service connection failure…,2)
(How to buy…, 3)


上面是数据清理只有的文本 RDD 示例(每条记录是一个文本-标签对)。


1.数据读取


我们可以使用 Analytics Zoo 提供的 TextSet 以分布式方式读取文本数据,如下所示。


from zoo.feature.text import TextSetfrom zoo.common.nncontext import init_nncontext
sc = init_nncontext("Text Classification")text_set = TextSet.read(data_path, sc)
复制代码


2.分词


然后我们将句子分解为单词,将每个输入文本转换为一个标记(单词)数组,并对标记进行规范化(例如,删除未知字符并转换为小写)。


text_set = text_set.tokenize()   \                   .normalize()
复制代码


3.序列对齐


不同的文本可能会生成不同大小的标记数组。但是,文本分类模型要求输入的所有记录大小固定。因此,我们必须将标记数组对齐到相同的大小(在 parametersequence_lengthin 文本分类器中指定)。如果标记数组的大小大于所需的大小,则从开头或结尾对单词进行删除;否则,我们将无意义的单词填充到数组的末尾(例如“##”)。


text_set= text_set.shape_sequence(sequence_length)
复制代码


4.词索引


标记数组大小对齐后,需要将每个标记(单词)转换为索引,可用来查找其词向量(在文本分类器模型中)。在单词转换为索引的过程中,我们还通过删除文本中出现频率最高 N 个单词来移除停用词(即经常出现在文本中但无助于语义理解的单词,如“the”、“of”等)。


text_set= text_set.word2idx(remove_topN=10, max_words_num)
复制代码


5.转换成样本


经过以上步骤,每个文本都变成一个有形状的张量(sequence_length, 1),然后我们从每个记录构造一个 BigDL样本,以生成的张量作为特征,以标签整数作为标签。


text_set = text_set.generate_sample()
复制代码

模型训练、测试、评估和优化

在以相同的方式准备好训练数据集(train_rdd)和验证数据集(val_rdd)之后,我们实例化一个新的 TextClassifier 模型(text_classifier),然后创建一个优化器以分布式方式训练模型。我们使用稀疏分类交叉熵(Sparse Categorical Cross Entropy)作为损失函数。


train_set, val_set= text_set.random_split( \    [training_split, 1 - training_split])
model.compile(optimizer=Adagrad(learningrate=float(options.learning_rate), \ learningrate_decay=0.001), loss="sparse_categorical_crossentropy", \ metrics=['accuracy'])
model.fit(train_set,batch_size=int(options.batch_size), \ nb_epoch=max_epoch, validation_data=val_set)
复制代码


训练时可调整的参数包括轮数、批大小、学习率等。你可以指定输出指标的验证选项,如在训练过程中设置准确度验证,以检测过拟合或欠拟合。


如果在验证数据集上获得的结果不好,我们就必须优化模型。通常,这是指重复调整超参数/数据/模型、训练和验证的过程,直到结果足够好为止。通过调整学习速率、添加新数据和扩充停用词字典,我们的准确率得到了显著提高。


有关 Analytics Zoo 中的文本处理分类支持的更多细节,请参阅这些文档。


作者简介Jason Dai 是英特尔大数据技术高级首席工程师兼首席技术官,负责领导全球工程团队(包括硅谷和上海)开发先进的大数据分析和机器学习技术。他是 Apache Spark 的提交者、PMC 成员、Apache MXNet 导师、Strata Data Conference 北京站的联合主席以及 BigDL(Apache Spark 的分布式深度学习框架)的创建者。


查看英文原文:Analytics Zoo: Unified Analytics + AI Platform for Distributed Tensorflow, and BigDL on Apache Spark


公众号推荐:

跳进 AI 的奇妙世界,一起探索未来工作的新风貌!想要深入了解 AI 如何成为产业创新的新引擎?好奇哪些城市正成为 AI 人才的新磁场?《中国生成式 AI 开发者洞察 2024》由 InfoQ 研究中心精心打造,为你深度解锁生成式 AI 领域的最新开发者动态。无论你是资深研发者,还是对生成式 AI 充满好奇的新手,这份报告都是你不可错过的知识宝典。欢迎大家扫码关注「AI前线」公众号,回复「开发者洞察」领取。

2018-12-22 11:161916
用户头像

发布了 1008 篇内容, 共 374.6 次阅读, 收获喜欢 341 次。

关注

评论 1 条评论

发布
暂无评论
发现更多内容

Java三大特性(一)—封装

共饮一杯无

Java 面向对象 10月月更

如何快速对混合云环境进行安全合规检查

HummerCloud

云安全 上云合规 安全合规检测 10月月更

「趣学前端」给不懂技术的朋友简单演示,代码是怎么被编写出来的

叶一一

JavaScript 前端 10月月更

「趣学前端」日常浏览的页面是怎么实现出来的

叶一一

JavaScript 前端 10月月更

Java基础(六)| Debug模式及基础练习

timerring

Java debug 10月月更

挑战30天学完 Python:Day4数据类型-字符串str

MegaQi

挑战30天学完Python 10月月更

Python进阶(二十二)Python3使用PyMysql连接mysql数据库

No Silver Bullet

Python3 MySQL数据库 pymysql 10月月更

Python进阶(二十三)Django使用pymysql连接MySQL数据库做增删改查

No Silver Bullet

Python django MySQL数据库 pymysql 10月月更

Spring Batch 可以在一个 Step 中有多个 Tasklet 吗

HoneyMoose

PriorityQueue源码解析(一)

知识浅谈

Priority Queue 10月月更

正在消失的机器视觉公司

脑极体

有人意图取代SQL,你同意吗?

雨果

sql

跟着卷卷龙一起学Camera--双摄01

卷卷龙

ISP camera 10月月更

「CSS畅想」周期性事情怕忘,来看看一个月内都安排在哪天

叶一一

CSS JavaScript 前端 10月月更

一个实际的例子学习 SAP BTP Java 应用的 @Before 注解使用方式

Jerry Wang

Java 云原生 Cloud SAP 10月月更

首轮Zepoch节点已售罄完结,你期待次轮吗?

鳄鱼视界

【从0到1学算法】5.Bubble Sort算法-下

Geek_65222d

10月月更

大数据ELK(二十六):探索数据(Discovery)

Lansonli

discovery 10月月更

Spring之核心容器

楠羽

笔记 SP【ring 10月月更

跟着卷卷龙一起学Camera--DNG格式

卷卷龙

ISP camera 10月月更

什么是Scrum?Scrum的核心要点和精髓

laofo

Scrum 研发效能 敏捷研发

微信朋友圈架构图

Johnny

架构实战训练营9期

Spring Batch 事务限制

HoneyMoose

什么是数据产品经理?数据产品经理与传统产品经理有什么区别?

雨果

数据产品经理

大数据ELK(二十五):添加Elasticsearch数据源

Lansonli

elasticsearch 10月月更

Clickhouse:delete提交成功,数据还在

Ken

Clickhouse

【LeetCode】两个链表的第一个重合节点Java题解

Albert

算法 LeetCode 10月月更

「趣学前端」日常浏览的页面为什么可以五颜六色

叶一一

CSS 前端 10月月更

跟着卷卷龙一起学Camera--双摄02

卷卷龙

ISP camera 10月月更

树莓派4B安装64位Linux(不用显示器键盘鼠标)

程序员欣宸

树莓派 10月月更

测试需求平台5-Blueprint优化与首个vue页搭建

MegaQi

测试平台开发教程 10月月更

Analytics Zoo:在Apache Spark上实现分布式Tensorflow和BigDL管道的统一分析和AI平台 _AI&大模型_Jason Dai_InfoQ精选文章