如何使用 Apache Beam 对 TensorFlow 管道进行预处理?

阅读数:60 2019 年 10 月 4 日 08:00

如何使用Apache Beam对TensorFlow管道进行预处理?

如何使用Apache Beam对TensorFlow管道进行预处理?

在生产中使用机器学习时,确保在模型的离线培训期间应用的特征工程步骤与使用模型用于预测时应用的特征工程步骤相同通常是一个挑战。此外,在当今世界,机器学习模型在非常大的数据集上进行训练,因此在训练期间应用的预处理步骤在大规模分布式计算框架(例如 Google Cloud Dataflow 或 Apache Spark)上实现。因此,训练环境通常与服务环境非常不同,可能在训练和服务期间执行的特征工程之间产生不一致。幸运的是,我们现在有了 tf.Transform ,一个 TensorFlow 库,它提供了一个优雅的解决方案。

机器学习模型需要数据来训练,但是通常需要对这些数据进行预处理,以便在训练模型时有用。这种预处理(通常称为“特征工程”)采用多种形式,例如:标准化和缩放数据,将分类值编码为数值,形成词汇表,以及连续数值的分级。

在生产中使用机器学习时,确保在模型的离线培训期间应用的特征工程步骤与使用模型用于预测时应用的特征工程步骤相同通常是一个挑战。此外,在当今世界,机器学习模型在非常大的数据集上进行训练,因此在训练期间应用的预处理步骤在大规模分布式计算框架(例如 Google Cloud Dataflow 或 Apache Spark)上实现。因此,训练环境通常与服务环境非常不同,可能在训练和服务期间执行的特征工程之间产生不一致。

幸运的是,我们现在有了 tf.Transform ,一个 TensorFlow 库,它提供了一个优雅的解决方案,以确保在培训和服务期间功能工程步骤的一致性。在这篇博文中,我们将提供在 Google Cloud Dataflow 上使用 tf.Transform 的具体示例,以及在 Cloud ML Engine 上进行模型培训和服务。

如何使用Apache Beam对TensorFlow管道进行预处理?

应用于机器模拟用例的变换

ecc.ai 是一个有助于优化机器配置的平台。我们模拟物理机器(例如瓶子灌装机或饼干机器)以找到更优化的参数设置。由于每个模拟物理机器的目标是具有与实际机器相同的输入 / 输出特性,我们称之为“数字双胞胎”。

这篇博客文章将描述这个“数字双胞胎”的设计和实现过程。在最后一段中,您可以找到有关我们随后如何使用这些数字双胞胎来优化机器配置的更多信息。

tf.Transform 解释

tf.Transform 是 TensorFlow 的一个库,它允许用户定义预处理管道并使用大规模数据处理框架运行这些管道,同时还以可以作为 TensorFlow 图的一部分运行的方式导出管道。用户通过组合模块化 Python 函数来定义管道,然后 tf.Transform 随 Apache Beam 执行。tf.Transform 导出的 TensorFlow 图可以在使用训练模型进行预测时复制预处理步骤,例如在使用 TensorFlow Serving 服务模型时。

如何使用Apache Beam对TensorFlow管道进行预处理?

tf.Transform 允许用户定义预处理管道。用户可以实现预处理数据以用于 TensorFlow 训练,还可以导出将转换编码为 tensorFlow 图的 tf.Transform 图。然后可以将该变换图结合到用于推断的模型图中。

tf.Transform 建立数字双胞胎

数字双模型的目标是能够根据其输入预测机器的所有输出参数。为了训练这个模型,我们分析了包含这种关系的观察记录历史的日志数据。由于日志数据量可能相当广泛,理想情况下应该以分布式方式运行此步骤。此外,必须在训练和服务时间之间使用相同的概念和代码,而对预处理代码的改动最小。

当我们开始开发时,我们在任何现有的开源项目中都找不到此功能。因此,我们开始构建用于 Apache Beam 预处理的自定义工具,这使我们能够分配我们的工作负载并轻松地在多台机器之间切换。不幸的是,这种方法不允许我们在服务时重复使用相同的代码作为 TensorFlow 图的一部分运行(即在生产环境中使用训练模型时)。

在实践中,我们必须在 Apache Beam 中编写自定义分析步骤,计算并保存每个变量所需的元数据,以便在后续步骤中进行实际的预处理。我们在培训期间使用 Apache Beam 执行后续预处理步骤,并在服务期间作为 API 的一部分执行。不幸的是,由于它不是 TensorFlow 图的一部分,我们不能简单地使用 ML Engine 将我们的模型部署为 API,而我们的 API 总是由预处理部分和模型部分组成,这使得统一升级变得更加困难。此外,我们需要为我们想要使用的每个现有和新的转换实施和维护分析并转换步骤。

TensorFlow Transform 解决了这些问题。自宣布以来,我们将其直接整合为我们完整管道的主要构建块。

简化数字双胞胎示例流程

我们现在将专注于构建和使用特定机器的数字双胞胎。举个例子,我们将转向一个假想的布朗尼面团机器。这台机器采用不同的原始组件,然后加热并混合它们,直到产生完美的质地。我们将从批处理问题开始,这意味着数据在完整的生产批次中汇总,而不是在连续流中汇总。

数据

我们有两种类型的数据:

  • 输入数据:原料描述(绿色)和布朗尼面团机(蓝色)的设置。您可以在下面找到列名称和 3 个示例行。

如何使用Apache Beam对TensorFlow管道进行预处理?

  • 输出数据:具有这些原材料的机器设置结果:消耗的能量,输出的质量度量和输出量。您可以在下面找到列名称和 3 个示例行。

如何使用Apache Beam对TensorFlow管道进行预处理?

制作数字双胞胎

如何使用Apache Beam对TensorFlow管道进行预处理?

在这里,我们根据存储为云存储中两种不同类型文件的历史日志数据来训练系统的数字双胞胎。该数字双胞胎将能够基于输入数据预测输出数据。上图说明了我们在此流程中使用的 Google 服务。

预处理

预处理(制作训练示例)将使用 Apache Beam 使用 tf.Transform 函数完成。
预处理阶段包括 4 个步骤,代码如下:

1. 组合输入 / 输出数据并使原始数据 PCollection。

raw_data_input = (
p
| ‘ReadInputData’ >> textio.ReadFromText(train_data_file)
| ‘ParseInputCSV’>> beam.Map(converter_input.decode)
| ‘ExtractBatchKeyIn’>> beam.Map(extract_batchkey))

raw_data_output = (
p
| ‘ReadOutputData’ >> textio.ReadFromText(train_data_file)
| ‘ParseOutputCSV’>> beam.Map(converter_output.decode)
| ‘ExtractBatchKeyOut’>> beam.Map(extract_batchkey))

raw_data = (
(raw_data_input, raw_data_output)
| ‘JoinData’ >> CoGroupByKey()
| ‘RemoveKeys’>> beam.Map(remove_keys))

2. 定义将预处理原始数据的预处理功能。此函数将组合多个 TF-Transform 函数,以生成 TensorFlow Estimators 的示例。

语言:Python

def preprocessing_fn(inputs):
“”“Preprocess input columns into transformed columns.”""
outputs = {}
# Encode categorical column:
outputs[‘Mixing Speed’] = tft.string_to_int(inputs[‘Mixing Speed’])
# Calculate Derived Features:
outputs[‘Total Mass’] = inputs[‘Butter Mass’] + inputs[‘Sugar Mass’] + inputs[‘Flour Mass’]
for ingredient in [‘Butter’, ‘Sugar’, ‘Flour’]:
ingredient_percentage = inputs[’{} Mass’.format(ingredient)] / outputs[‘Total Mass’]
outputs[‘Norm {} perc’.format(ingredient)] = tft.scale_to_z_score(ingredient_percentage)
# Keep absolute numeric columns
for key in [‘Total Volume’, ‘Energy’]:
outputs[key]=inputs[key]
# Normalize other numeric columns
for key in [
‘Butter Temperature’,
‘Sugar Humidity’,
‘Flour Humidity’
‘Heating Time’,
‘Mixing Time’,
‘Density’,
‘Temperature’,
‘Humidity’,
]:
outputs[key] = tft.scale_to_z_score(inputs[key])
# Extract Specific Problems
chunks_detected_str = tf.regex_replace(
inputs[‘Problems’],
‘.chunk.
‘chunk’,
name=‘Detect Chunk’)
outputs[‘Chunks’]=tf.equal(chunks_detected_str,‘chunk’) return outputs

3. 使用预处理功能分析和转换整个数据集。这部分代码将采用预处理功能,首先分析数据集,即完整传递数据集以计算分类列的词汇表,然后计算平均值和标准化列的标准偏差。接下来,Analyze 步骤的输出用于转换整个数据集。

transform_fn = raw_data | AnalyzeDataset(preprocessing_fn)
transformed_data = (raw_data, transform_fn) | TransformDataset()

4. 保存数据并序列化 TransformFn 和元数据文件。

transformed_data | “WriteTrainData” >> tfrecordio.WriteToTFRecord(
transformed_eval_data_base,
coder=example_proto_coder.ExampleProtoCoder(transformed_metadata))

_ = (
transform_fn
| “WriteTransformFn” >>
transform_fn_io.WriteTransformFn(working_dir))

transformed_metadata | ‘WriteMetadata’ >> beam_metadata_io.WriteMetadata(
transformed_metadata_file, pipeline=p)

训练

使用预处理数据作为TFRecords,我们现在可以使用 Estimators 轻松训练带有标准 TensorFlow 代码的 TensorFlow 模型。

导出训练的模型

在分析数据集的结构化方法旁边,tf.Transform 的实际功能在于可以导出预处理图。这允许您导出 TensorFlow 模型,该模型包含与训练数据完全相同的预处理步骤。
为此,我们只需要使用 tf.Transform 输入函数导出训练模型:

tf_transform_output = tft.TFTransformOutput(working_dir)
serving_input_fn = _make_serving_input_fn(tf_transform_output)
exported_model_dir = os.path.join(working_dir, EXPORTED_MODEL_DIR)
estimator.export_savedmodel(exported_model_dir, serving_input_fn)

当 _make_serving_input_fn 功能是,你可以简单地不同的项目之间重用的项目逻辑,无论一个非常普遍的功能:

语言:Python

def _make_serving_input_fn(tf_transform_output):
raw_feature_spec = RAW_DATA_METADATA.schema.as_feature_spec()
raw_feature_spec.pop(LABEL_KEY)

def serving_input_fn():
raw_input_fn = input_fn_utils.build_parsing_serving_input_fn(
raw_feature_spec)
raw_features, _, default_inputs = raw_input_fn()
transformed_features = tf_transform_output.transform_raw_features(
raw_features)
return input_fn_utils.InputFnOps(transformed_features, None, default_inputs)

return serving_input_fn

使用数字双胞胎

如何使用Apache Beam对TensorFlow管道进行预处理?

数字双胞胎示例流程的最后一部分使用保存的模型根据输入预测系统的输出。这是我们可以充分利用 tf.Transform 的地方,因为这使得在 Cloud ML Engine 上部署“TrainedModel”(包括预处理)变得非常容易。 
要部署训练模型,您只需运行 2 个命令:

gcloud ml-engine models create MODEL_NAME
gcloud ml-engine versions create VERSION --model=MODEL_NAME --origin=ORIGIN

现在,我们可以使用以下代码轻松与我们的数字双胞胎进行交互

def get_predictions(project, model, instances, version=None):
service = discovery.build(‘ml’, ‘v1’)
name = ‘projects/{}/models/{}’.format(project, model)

复制代码
if version is not None:
name += '/versions/{}'.format(version)
response = service.projects().predict(
name=name,
body={'instances': instances}
).execute()
if 'error' in response:
raise RuntimeError(response['error'])
return response['predictions']

if name == “main”:
predictions = get_predictions(
project="<project_id>",
model="<model_name>",
instances=[
{
“Butter Mass”: 121,
“Butter Temperature”: 20,
“Sugar Mass”: 200,
“Sugar Humidity”: 0.22,
"Flour Mass ": 50,
“Flour Humidity”: 0.23,
“Heating Time”: 50,
“Mixing Speed”: “Max Speed”,
“Mixing Time”: 200
}]
)

ecc.ai,我们使用数字双胞胎来优化物理机器的参数。
简而言之,我们的方法包括 3 个步骤(如下图 1 所示):

  1. 使用历史机器数据创建模拟环境。机器的这种“数字双胞胎”将作为允许增强剂学习最佳控制策略的环境。
  2. 使用数字双胞胎使用我们的强化学习(RL)代理查找(新)最佳参数设置。
  3. 使用 RL 代理配置真实机器的参数。

如何使用Apache Beam对TensorFlow管道进行预处理?

总结

通过 tf.Transform,我们现在已将我们的模型部署在 ML Engine 上作为 API,作为特定布朗尼面团机的数字双胞胎:它采用原始输入功能(成分描述和机器设置)并将返回预测输出机。
好处是我们不需要维护 API 并且包含所有内容 - 因为预处理是服务图的一部分。如果我们需要更新 API,那么所有需要做的就是使用新版本刷新模型,并自动为您更新所有相关的预处理步骤。
此外,如果我们需要为另一个布朗尼面团机器(使用相同数据格式的机器)制作数字双模型,但是在不同的工厂或设置中运行,我们可以轻松地重新运行相同的代码而无需手动调整预处理代码或执行自定义分析步骤。

作者介绍

张海涛,目前就职于海康威视云基础平台,负责海康威视在全国金融行业 AI 大数据落地的基础架构设计和中间件的开发,专注 AI 大数据方向。Apache Beam 中文社区发起人之一,如果想进一步了解最新 Apache Beam 和 ClickHouse 动态和技术研究成果,请加微信 cyrjkj 入群共同研究和运用。

系列文章传送门:

系列文章第一篇 《Apache Beam 实战指南 | 基础入门》

系列文章第二篇 《Apache Beam 实战指南 | 手把手教你玩转 KafkaIO 与 Flink》

系列文章第三篇 《Apache Beam 实战指南 | 手把手教你玩转大数据存储 HdfsIO》

系列文章第四篇 《Apache Beam 实战指南 | 如何结合 ClickHouse 打造“AI 微服务”?》

系列文章第五篇 《Apache Beam 实战指南 | 大数据管道(pipeline)设计及实践》

评论

发布