如何将Node.js Streaming MapReduce引入Amazon EMR

2015 年 7 月 21 日

概述

Node.js 是一套 JavaScript 框架,其核心诉求在于利用非阻塞 I/O 以及异步式事件驱动处理模型实现服务器端应用程序的高性能运行。

当客户需要处理规模庞大且复杂性较高的数据时,Node.js 能够提供一套以原生方式支持 JSON 数据结构的运行时环境。Python 及 Ruby 等编程语言都拥有面向 JSON 数据的优秀支持能力,但 Node.js 在处理包含列表及数组的结构时显得尤为得心应手。Node.js 还提供一套高性能且具备可扩展能力的备用方案,能够以原生方式将 JSON 数据作为对象加以处理。目前大家已经可以获得专门针对 Node.js 的 AWS SDK( http://aws.amazon.com/sdkfornodejs ),其允许我们将 Node.js 应用程序与 AWS 服务加以紧密整合。

在今天的文章中,大家将了解到如何在 Amazon Elastic MapReduce(简称 Amazon EMR)当中安装 Node.js、如何构建一款 Node.js 应用程序并将其与 Hadoop Streaming 并发处理架构相整合、外加如何在 AWS 之上部署并运行我们的 Node.js MapReduce 应用程序。要了解更多与 Amazon EMR 以及 Hive 项目相关之细节信息,请大家点击此处查看专题教程。

本文假定大家已经熟知与Hadoop、Hive 以及Amazon EMR 相关的专业知识。

用例

在本文当中,我们将使用来自Twitter 的数据,这部分数据中包含由推文、转发、回复以及直发信息所构成的复杂信息体系。每个文件中都包含有单一Twitter 事务数据块,我们需要将其内容写入至Amazon Simple Storage Service(简称Amazon S3)并随时加以读取。我们希望针对Hive 的实际需求对数据进行转换,旨在对转发率、每秒推文数量以及每用户直发信息数量等指标进行汇总。

样本输入数据

我们的数据表现出一套由大量Twitter 用户所构成的复杂交互图谱,其中包括推文转发与回复内容,如下图所示:

为了进行数据发现,我们可以利用Hive 配合JsonSerde(可参看 http://aws.amazon.com/articles/2854 )对数据进行调查。不过由于这套图谱非常复杂而且存在自我指涉,因此大部分 Hive JsonSerde 无法将这部分数据显示为表格。能够在 Node.js 中进行数据处理,我们能够更轻松地利用简单语法实现数据图谱导航。

安装 Node.js

我们可以利用以下 bootstrap 操作将 Node.js 安装在 Amazon EMR 集群当中:

复制代码
---bootstrap-actions Path=s3://github-emr-bootstrap-actions/node/install-nodejs.sh,Name=InstallNode.js

(如果大家此前从未接触过 bootstrap 操作,请点击此处查阅Amazon EMR 提供的说明文档。)

编写MapReduce 代码

现在Node.js 已经被安装在Amazon EMR 集群之上,而Hadoop Streaming 也得到正确配置,接下来我们需要运行自己的Node.js 应用程序以完成映射与归约操作。利用Hadoop Streaming 处理架构,我们能够为流任务指定所需要使用的映射与归约代码。

与其它Hadoop Streaming 兼容语言一样,我们必须从Amazon S3 数据存储或者HDFS 文件系统当中利用标准输入(即stdin)方式实现数据读取。在Node.js 当中,stdin 能够利用处理全局对象的方式获得可访问能力(参见 http://nodejs.org/api/process.html )。这就使我们得以访问多种控制机制,进而通过管理输入与输出数据流对数据进行读取与写入,例如 process.stdin 与 process.stdout。

我们的 MapReduce 程度必须执行五项主要函数以实现从 Hadoop Streaming 以及输出结果中读取数据。

配置标准输入

在默认情况下,process.stdin 输入通道会处于暂停状态,且不触发任何事件。在将其启用之前,我们必须首先配置所需的字符集编码。对于非多字节字符集而言,我们可以使用 UTF-8。因此,我们映射或者归约方案的主体流程应该从这里开始:

复制代码
process.stdin.setEncoding('utf8');

或者,我们也可以使用 UTF-16 实现多字节支持。在此之后,我们必须启用 stdin 对事件进行恢复以及触发:

复制代码
process.stdin.resume();

处理来自 STDIN 的输入内容

当我们的 Node.js 应用程序使用该 stdin.data 事件时,process.stdin 通道会就此发出通知——stdin.data 事件会在一定数量的数据可用于读取时被触发。我们的 Node.js 应用程序必须对这部分数据进行缓存处理以备后续使用,因为事件所提供的每一个数据块可能都仅仅属于标准输入内容中全部可用数据的一小部分。由于我们此前通过配置让 Hadoop Streaming 使用非分割式 FileInputFormat,因此我们会在单一映射器中获取到完整的 JSON 数据,并能够将该文件作为整体加以处理。有鉴于此,我们可以通过以下代码将数据块缓存于 data 事件当中:

复制代码
var line = ‘’;
// fires on every block of data read from stdin
process.stdin.on('data', function(chunk) {
// chunk and emit on newline
lines = chunk.split("\n")
if (lines.length > 0) {
// append the first chunk to the existing buffer
line += lines[0]
if (lines.length > 1) {
// emit the current buffer
emitter.emit(lineEvent,line);
// go through the rest of the lines and emit them, buffering the last
for (i=1;i<lines.length; i++) {
if (i<lines.length) {
emitter.emit(lineEvent,lines[i]);
} else {
line = lines[i];
}
}
}
}
});

上述操作会将全部数据块附加至行变量处,并在每一次发现新的换行符时触发“lineReady”事件。

处理完整的 stdin 数据

在全部来自 stdin 的数据被读取完成后,该流程将触发 stdin.end 事件。我们已经将全部数据收集到行缓冲区当中,这样我们只需要利用以下代码刷新最后一行数据:

复制代码
// fires when stdin is completed being read
process.stdin.on('end', function() {
emitter.emit(lineEvent,line);
});

每当新的内容行准备就绪时,我们都将利用以下代码将其排序至一个 JSON 对象当中:

复制代码
try {
obj = JSON.parse(line);
} catch (err) {
process.stderr.write('Error Processing Line ' + line + '\n');
process.stderr.write(err);
return;
}

我们可以选择把复杂的 JSON 数据简化为普通输出结果,以供 Hive JsonSerde 进行加载,或者选择生成 CSV 或者 XML 数据来代替。

以 Hadoop 兼容格式写入数据

对于某些特定的 MapReduce 操作类型,我们需要确保其归约器能够获取到归属于特定类型的全部数据。为了实现这一目标,我们必须指定一个键值,并保证 Hadoop 在调用该归约器之前会首先对输出结果进行分类。在进行文本内容处理时,我们会利用由\t 标签开头的字符串来表示这个值。

要执行存储或者移除方面的数据写入操作,我们需要利用 process.stdout.write() 向 stdout 实施写入。

制作可执行文件

Amazon EMR 利用命令行语法调用的方式运行映射器与归约器,例如“./mapper.js”。因此,我们需要确保我们所构建的 Node.js 模块能够通过命令行实现调用。为达成这一目标,我们在映射器或者归约器文件的开头处添加一条标准“shebang”命令,这样它就能调用 Node.js 并运行脚本内容:

复制代码
#!/usr/bin/env node

接下来,大家可以通过命令行调用的方式测试自己的映射器代码了(以下示例假定代码位于名为 Mapper.js 的文件当中):

复制代码
./mapper.js < input-file-path

部署与运行

在编写了自己的映射器与归约器之后,接下来我们将其传输至 Amazon S3 当中,而后利用 Amazon EMR 针对部分输入数据运行 MapReduce。

以下示例讲解了如何利用 Amazon EMR 命令行执行各个步骤(参见 http://aws.amazon.com/developertools/2264 ),不过大家也可以在 Amazon EMR 控制台(参见 console.aws.amazon.com/elasticmapreduce )或者 Amazon EMR API(参见 http://docs.aws.amazon.com/ElasticMapReduce/latest/API/Welcome.html?r=8857 )中利用命令实现同样的效果。我们将展示如何以自动方式利用 AWS 命令行工具运行该应用程序,但大家完全可以使用 AWS Web Console 或者 AWS Data Pipeline 完成同样的工作。我们可以使用–create-cluster 命令启动一套新的 Amazon EMR 集群,同时利用以下代码启动该集群并捃行我们的 Node.js bootstrap 操作:

复制代码
aws emr create-cluster --ami-version 3.3.1 --enable-debugging --visible-to-all-users
--name MyNodeJsMapReduceCluster --instance-groups  InstanceCount=2,InstanceGroupType=CORE,InstanceType=m3.xlarge
InstanceCount=1,InstanceGroupType=MASTER,InstanceType=m3.xlarge --
no-auto-terminate --enable-debugging --log-uri s3:///logs --
bootstrap-actions Path=s3://github-emr-bootstrap-
actions/node/install-nodejs.sh,Name=InstallNode.js --ec2-attributes
KeyName=<my key pair>

这样我们就创建了一套始终启用的集群,其中包含配备 3.3.1 AMI、双核心节点以及一个主节点,全部采用 m3.xlarge 实例类型。以上代码同时为指定存储桶设定了调试与日志记录机制,并通过 bootstrap 操作完成了 Node.js 的启动时安装。除此之外,代码中还使用了 Amazon EC2 密钥对,从而将 SSH 安全机制引入该 Hadoop 集群。

接下来,我们将添加 Hadoop Streaming 流程,旨在处理自己的输入数据。在以下代码中,大家需要把替换为自己的实际集群 ID:

复制代码
aws emr add-steps --cluster-id <my cluster ID>
--steps Name=NodeJSStreamProcess,Type=Streaming

我们通过创建一套文件系统参考(利用—files 参数)添加自己的映射器与归约器 JavaScript 文件,而后将该基础文件名通过 -mapper 与 -reducer 进行引用:

复制代码
Args=--files,"s3://<path to mapper>/mapper.js\,s3://<path to
reducer>/reducer.js",-mapper,mapper.js,-reducer,reducer.js

而后,我们添加该输入与输出文件的位置:

复制代码
-input,s3://<path-to-input-files>,-output,s3://<path-to-output-files>

这样一来,我们就获得了如下完整命令行调用代码:

复制代码
aws emr add-steps --cluster-id <my cluster ID>
--steps Name=NodeJSStreamProcess,Type=Streaming,Args=--files,
"s3://<path to mapper>/mapper.js\,s3://<path to reducer>/reducer.js",
-input,s3://<path-to-input-files>,-output,s3://<
path-to-output-files>,-mapper,mapper.js,-reducer,reducer.js

大家只需要提供相应的时间量用于该流程运行,Hadoop 集群不再需要其它迭代实现数据生成。请确保在运行上述示例时,大家在执行完成后及时关闭自己的集群。我们可以利用以下 Amazon EMR 命令完成集群关闭操作:

复制代码
aws emr terminate-clusters --cluster-ids <my cluster ID>

总结

Node.js 能够在实现 MapReduce 应用程序快速执行效果的同时,利用简洁的原生语法对高复杂性 JSON 数据加以处理。通过 Amazon EMR 配置选项,大家可以轻松运行基于 Node.js 的应用程序,并随时间推移或者输入数据量的增加提升其规模。

附录——映射 / 归约应用程序示例

以下 MapReduce 程序旨在以天为单位将推文内容输出为高复杂性 JSON 结构化数据。在本示例中,Twitter 数据由 DataSift(来自 datasift.com)负责收集。我们去掉了其中的某些特殊字符,例如换行符与制表符,并将推文 created_at 字段输出为键。归约器随后根据日期对这部分数据加以排序,并输出推文的整体数量。

映射器

复制代码
#!/usr/bin/env node
var events = require('events');
var emitter = new events.EventEmitter();
var line = '';
var lineEvent = 'line';
var dataReady = 'dataReady';
// 移除全部控制字符,从而保证输出结果由纯文本内容构成
String.prototype.escape = function() {
return this.replace('\n', '\\n').replace('\'', '\\\'').replace('\"', '\\"')
.replace('\&', '\\&').replace('\r', '\\r').replace('\t', '\\t')
.replace('\b', '\\b').replace('\f', '\\f');
}
// 为此附加一套数组
Array.prototype.appendArray = function(arr) {
this.push.apply(this, arr);
}
// 数据完成后,将其写入至必要的输出通道
emitter.on(dataReady, function(arr) {
var dateComponents = arr[9].split(' ');
var d = [dateComponents[1],dateComponents[2],dateComponents[3]].join(' ');
var interaction = {
key_date : d,
content: {
objectId : arr[0],
hash : arr[1],
id : arr[2],
author_id : arr[3],
author_avatar : arr[4],
author_link : arr[5],
author_name : arr[6],
author_username : arr[7],
content : arr[8],
created_at : arr[9],
link : arr[10],
schema_version : arr[11],
source : arr[12]
}
};
process.stdout.write(interaction.key_date + '\t' + JSON.stringify(interaction) + '\n');
});
// 通过捕捉到的输入数据生成一个 JSON 对象
// 而后生成所需的输出结果
emitter.on(lineEvent, function(l) {
var obj;
// 通过 input 事件创建该 JSON 对象
// 如果无法创建,则丢弃该项目
//
// TODO 在此生成一个例外以代替?
if (!line || line == '') {
return;
}
try {
obj = JSON.parse(line);
} catch (err) {
process.stderr.write('Error Processing Line ' + line + '\n');
process.stderr.write(err);
return;
}
// 为每个交互对象生成一个输出结果组
for ( var i = 0; i < obj.interactions.length; i++) {
// 根据语法创建几个便捷对象
var int = obj.interactions[i];
var a = int.interaction.author;
// 提取我们需要保留的对象模型内容
var output = [ obj.id, obj.hash, int.interaction.id, a.id,
a.avatar, a.link, a.name, a.username,
int.interaction.content.escape(), int.interaction.created_at,
int.interaction.link, int.interaction.schema.version,
int.interaction.source ];
// 当输出数组完成后触发事件
emitter.emit(dataReady, output);
}
});
// 作用于每一次由 stdin 引发的数据块读取操作
process.stdin.on('data', function(chunk) {
// 新行中汇总并执行
lines = chunk.split("\n")
if (lines.length > 0) {
// 将第一套数据块添加至现有缓冲区中
line += lines[0]
if (lines.length > 1) {
// 执行当前缓冲内容
emitter.emit(lineEvent,line);
// 推进行内剩余内容并加以执行,将最新内容纳入缓冲区
for (i=1; i<lines.length; i++) {
if (i < lines.length) {
emitter.emit(lineEvent,lines[i]);
} else {
line = lines[i];
}
}
}
}
});
// 当 stdin 读取操作完成后触发
process.stdin.on('end', function() {
emitter.emit(lineEvent,line);
});
// 设置 STDIN 编码
process.stdin.setEncoding('utf8');
// 恢复 STDIN——默认暂停
process.stdin.resume();

归约器

复制代码
#!/usr/bin/env node
var events = require('events');
var emitter = new events.EventEmitter();
var remaining = '';
var lineReady = 'lineReady';
var dataReady = 'dataReady';
var interactionSummary = {
day : '',
count : 0
};
// 移除全部控制字符,从而保证输出结果由纯文本内容构成
String.prototype.escape = function() {
return this.replace('\n', '\\n').replace('\'', '\\\'').replace('\"', '\\"')
.replace('\&', '\\&').replace('\r', '\\r').replace('\t', '\\t')
.replace('\b', '\\b').replace('\f', '\\f');
}
// 为此附加一套数组
Array.prototype.appendArray = function(arr) {
this.push.apply(this, arr);
}
// 数据完成后,将其写入至必要的输出通道
emitter.on(dataReady, function(o) {
if (o) {
process.stdout.write(JSON.stringify(o) + '\n');
}
});
// 通过捕捉到的输入数据生成一个 JSON 对象
// 而后生成所需的输出结果
emitter.on(lineReady,function(data) {
if (!data || data == '') {
// null 数据可能是一个关闭事件,意味着数据已经处理完毕
emitter.emit(dataReady, interactionSummary);
return;
}
try {
obj = JSON.parse(data.split('\t')[1]);
} catch (err) {
process.stderr.write('Error Processing Line ' + data + '\n')
process.stderr.write(err);
return;
}
if (interactionSummary.day == '') {
interactionSummary.day = obj.key_date;
interactionSummary.count = 1;
} else {
if (obj.key_date != interactionSummary.day) {
// 数组削减完成后触发事件
emitter.emit(dataReady, interactionSummary);
interactionSummary.day = obj.key_date;
interactionSummary.count = 1;
} else {
interactionSummary.count += 1;
}
}
});
// 作用于每一个从 stdin 处进行读取的数据块
process.stdin.on('data', function(chunk) {
var capture = chunk.split('\n');
for (var i=0;i<capture.length; i++) {
if (i==0) {
emitter.emit(lineReady,remaining + capture[i]);
} else if (i<capture.length-1) {
emitter.emit(lineReady,capture[i]);
} else {
remaining = capture[i];
}
}
});
// 当 stdin 读取操作完成后触发
process.stdin.on('end', function() {
emitter.emit(lineReady,remaining);
});
// 恢复 STDIN——默认为暂停
process.stdin.resume();
// 设置 STDIN 编码
process.stdin.setEncoding('utf8');

运行示例

复制代码
aws emr create-cluster --ami-version 3.3.1 --enable-debugging --visible-to-all-users
--name MyNodeJsMapReduceCluster --instance-groups  InstanceCount=2,InstanceGroupType=CORE,InstanceType=m3.xlarge
InstanceCount=1,InstanceGroupType=MASTER,InstanceType=m3.xlarge --
no-auto-terminate --enable-debugging --log-uri s3://<log
bucket>/EMR/logs --bootstrap-actions Path=s3://github-emr-
bootstrap-actions/node/install-nodejs.sh,Name=InstallNode.js --
service-role EMR_DefaultRole --ec2-attributes KeyName=<my key
pair>,InstanceProfile=EMR_EC2_DefaultRole
复制代码
aws emr add-
steps --cluster-id  --steps
Name=NodeJSStreamProcess,Type=Streaming,Args=--files,"s3://github-
aws-big-data-blog/aws-blog-nodejs-on-emr/scripts/sample-mapper.js
\,s3://github-aws-big-data-blog/aws-blog-nodejs-on-
emr/scripts/sample-reducer.js",-input,s3://github-aws-big-data-
blog/aws-blog-nodejs-on-emr/sample/tweets,-output,s3://<my output
bucket>/node_sample,-mapper,mapper.js,-reducer,reducer.js

在上述代码中,应被替代为我们希望创建输出结果的目标存储桶名称。执行完成后,预配置的输出存储桶及路径内将出现多个文件,其中包含整理得出的示例数据集中单一一天内出现的推文数量:

复制代码
{"day":"14 Feb 2013","count":1071}

如果大家愿意提出一点意见或者建议,请在下方的评论栏中与我们分享。

原文链接: http://blogs.aws.amazon.com/bigdata/post/TxVX5RCSD785H6/Node-js-Streaming-MapReduce-with-Amazon-EMR


感谢 Raymond Zhao 对本文的审校。

给 InfoQ 中文站投稿或者参与内容翻译工作,请邮件至 editors@cn.infoq.com 。也欢迎大家通过新浪微博( @InfoQ @丁晓昀),微信(微信号: InfoQChina )关注我们,并与我们的编辑和其他读者朋友交流(欢迎加入 InfoQ 读者交流群)。

2015 年 7 月 21 日 00:491155

评论

发布
暂无评论
发现更多内容

艺术生,我劝你Mac

zhoo299

Mac CG 艺术

绝了!Dataway让Spring Boot不再需要Controller、Service、DAO、Mapper

哈库纳

StringBoot DataQL

免费领课的活动你错过了么?

池建强

极客时间

深入浅出Mysql索引的那些事儿

猿人谷

MySQL 性能优化 索引

游戏夜读 | vim,vim,vim

game1night

珍藏已久的 OS 学习网站拿出来分享给大家

cxuan

操作系统

Vol.7 聊聊我热爱的陕西省图书馆

Lanpeng20

记录 生活,随想

Wi-Fi p2p & ap 共存

贾献华

wifi p2p ap

Dataway 4.1.5 以上版本升级指南

哈库纳

string StringBoot Dataway Hasor

无需代码!通过 Dataway 配置一个带有分页查询的接口

哈库纳

spring springboot Dataway Hasor

服务化构建-多维度的认识中台

图南日晟

软件工程 分层架构 架构设计

Vol.8 云栖小镇游记

Lanpeng20

阿里云 随笔 数字化转型

在培训机构花了好几万学Java,当了程序员还常被鄙视,这是招谁惹谁了?

四猿外

Java 学习 程序员 个人成长 转行程序员

服务化架构-状态码设计要点

图南日晟

微服务 RESTful 架构设计

我来聊聊模型驱动的前端开发

欧雷

软件工程 软件开发 前端开发 前端工程 前端架构

金灿灿的季节 - Apache DolphinScheduler收获5位新Committer

海豚调度

安装R语言编译器:

唯爱

【写作群星榜】5.22~5.28写作平台优秀作者&文章排名

InfoQ写作平台

写作平台 排行榜

完美兼容老项目!Dataway 4.1.6 返回结构的全面控制

哈库纳

spring Spring Boot Dataway Hasor

系统服务化构建-两方OAuth

图南日晟

微服务 软件工程 身份认证 架构设计

Dataway 整合 Swagger2,让 API 管理更顺畅

哈库纳

Spring Boot DataQL Dataway Hasor

小谈校招offer选择

dongh11

职场 职业规划 应届毕业 心态 招聘

tput命令介绍

唯爱

XSKY发布XMotion纳管热迁移技术,OpenStack集群迁移效率提升超10倍

XSKY融合存储

Java 学习笔记(三)数据类型

杜朋

ARTS-WEEK01

lee

ARTS 打卡计划

【快点查查】微信小程序使用流程

tomatocc

Vol.6 几个数据库相关的词

Lanpeng20

数据库 大数据 新手指南

Dataway 配置数据接口时和前端进行参数对接

哈库纳

Spring Boot DataQL Dataway Hasor

超简单入门MyBatis,看了就会了~

程序员的时光

mybatis

磁盘挂载

唯爱

如何将Node.js Streaming MapReduce引入Amazon EMR-InfoQ