如何将 Node.js Streaming MapReduce 引入 Amazon EMR

阅读数:1010 2015 年 7 月 21 日

概述

Node.js 是一套 JavaScript 框架,其核心诉求在于利用非阻塞 I/O 以及异步式事件驱动处理模型实现服务器端应用程序的高性能运行。

当客户需要处理规模庞大且复杂性较高的数据时,Node.js 能够提供一套以原生方式支持 JSON 数据结构的运行时环境。Python 及 Ruby 等编程语言都拥有面向 JSON 数据的优秀支持能力,但 Node.js 在处理包含列表及数组的结构时显得尤为得心应手。Node.js 还提供一套高性能且具备可扩展能力的备用方案,能够以原生方式将 JSON 数据作为对象加以处理。目前大家已经可以获得专门针对 Node.js 的 AWS SDK( http://aws.amazon.com/sdkfornodejs ),其允许我们将 Node.js 应用程序与 AWS 服务加以紧密整合。

在今天的文章中,大家将了解到如何在 Amazon Elastic MapReduce(简称 Amazon EMR)当中安装 Node.js、如何构建一款 Node.js 应用程序并将其与 Hadoop Streaming 并发处理架构相整合、外加如何在 AWS 之上部署并运行我们的 Node.js MapReduce 应用程序。要了解更多与 Amazon EMR 以及 Hive 项目相关之细节信息,请大家点击此处查看专题教程。

本文假定大家已经熟知与 Hadoop、Hive 以及 Amazon EMR 相关的专业知识。

用例

在本文当中,我们将使用来自 Twitter 的数据,这部分数据中包含由推文、转发、回复以及直发信息所构成的复杂信息体系。每个文件中都包含有单一 Twitter 事务数据块,我们需要将其内容写入至 Amazon Simple Storage Service(简称 Amazon S3)并随时加以读取。我们希望针对 Hive 的实际需求对数据进行转换,旨在对转发率、每秒推文数量以及每用户直发信息数量等指标进行汇总。

样本输入数据

我们的数据表现出一套由大量 Twitter 用户所构成的复杂交互图谱,其中包括推文转发与回复内容,如下图所示:

为了进行数据发现,我们可以利用 Hive 配合 JsonSerde(可参看 http://aws.amazon.com/articles/2854 )对数据进行调查。不过由于这套图谱非常复杂而且存在自我指涉,因此大部分 Hive JsonSerde 无法将这部分数据显示为表格。能够在 Node.js 中进行数据处理,我们能够更轻松地利用简单语法实现数据图谱导航。

安装 Node.js

我们可以利用以下 bootstrap 操作将 Node.js 安装在 Amazon EMR 集群当中:

---bootstrap-actions Path=s3://github-emr-bootstrap-actions/node/install-nodejs.sh,Name=InstallNode.js

(如果大家此前从未接触过 bootstrap 操作,请点击此处查阅Amazon EMR 提供的说明文档。)

编写 MapReduce 代码

现在 Node.js 已经被安装在 Amazon EMR 集群之上,而 Hadoop Streaming 也得到正确配置,接下来我们需要运行自己的 Node.js 应用程序以完成映射与归约操作。利用 Hadoop Streaming 处理架构,我们能够为流任务指定所需要使用的映射与归约代码。

与其它 Hadoop Streaming 兼容语言一样,我们必须从 Amazon S3 数据存储或者 HDFS 文件系统当中利用标准输入(即 stdin)方式实现数据读取。在 Node.js 当中,stdin 能够利用处理全局对象的方式获得可访问能力(参见 http://nodejs.org/api/process.html )。这就使我们得以访问多种控制机制,进而通过管理输入与输出数据流对数据进行读取与写入,例如 process.stdin 与 process.stdout。

我们的 MapReduce 程度必须执行五项主要函数以实现从 Hadoop Streaming 以及输出结果中读取数据。

配置标准输入

在默认情况下,process.stdin 输入通道会处于暂停状态,且不触发任何事件。在将其启用之前,我们必须首先配置所需的字符集编码。对于非多字节字符集而言,我们可以使用 UTF-8。因此,我们映射或者归约方案的主体流程应该从这里开始:

process.stdin.setEncoding('utf8'); 

或者,我们也可以使用 UTF-16 实现多字节支持。在此之后,我们必须启用 stdin 对事件进行恢复以及触发:

process.stdin.resume(); 

处理来自 STDIN 的输入内容

当我们的 Node.js 应用程序使用该 stdin.data 事件时,process.stdin 通道会就此发出通知——stdin.data 事件会在一定数量的数据可用于读取时被触发。我们的 Node.js 应用程序必须对这部分数据进行缓存处理以备后续使用,因为事件所提供的每一个数据块可能都仅仅属于标准输入内容中全部可用数据的一小部分。由于我们此前通过配置让 Hadoop Streaming 使用非分割式 FileInputFormat,因此我们会在单一映射器中获取到完整的 JSON 数据,并能够将该文件作为整体加以处理。有鉴于此,我们可以通过以下代码将数据块缓存于 data 事件当中:

var line = ‘’;
// fires on every block of data read from stdin
process.stdin.on('data', function(chunk) {
    // chunk and emit on newline
    lines = chunk.split("\n")
     
    if (lines.length > 0) {
        // append the first chunk to the existing buffer
        line += lines[0]
         
        if (lines.length > 1) {
            // emit the current buffer
            emitter.emit(lineEvent,line);
 
            // go through the rest of the lines and emit them, buffering the last
            for (i=1;i<lines.length; i++) {
                if (i<lines.length) {
                    emitter.emit(lineEvent,lines[i]);
                } else {
                    line = lines[i];
                }
            }
        }
    }
});

上述操作会将全部数据块附加至行变量处,并在每一次发现新的换行符时触发“lineReady”事件。

处理完整的 stdin 数据

在全部来自 stdin 的数据被读取完成后,该流程将触发 stdin.end 事件。我们已经将全部数据收集到行缓冲区当中,这样我们只需要利用以下代码刷新最后一行数据:

// fires when stdin is completed being read
process.stdin.on('end', function() {
    emitter.emit(lineEvent,line);
});

每当新的内容行准备就绪时,我们都将利用以下代码将其排序至一个 JSON 对象当中:

try {
    obj = JSON.parse(line);
} catch (err) {
    process.stderr.write('Error Processing Line ' + line + '\n');
    process.stderr.write(err);
    return;
}

我们可以选择把复杂的 JSON 数据简化为普通输出结果,以供 Hive JsonSerde 进行加载,或者选择生成 CSV 或者 XML 数据来代替。

以 Hadoop 兼容格式写入数据

对于某些特定的 MapReduce 操作类型,我们需要确保其归约器能够获取到归属于特定类型的全部数据。为了实现这一目标,我们必须指定一个键值,并保证 Hadoop 在调用该归约器之前会首先对输出结果进行分类。在进行文本内容处理时,我们会利用由\t 标签开头的字符串来表示这个值。

要执行存储或者移除方面的数据写入操作,我们需要利用 process.stdout.write() 向 stdout 实施写入。

制作可执行文件

Amazon EMR 利用命令行语法调用的方式运行映射器与归约器,例如“./mapper.js”。因此,我们需要确保我们所构建的 Node.js 模块能够通过命令行实现调用。为达成这一目标,我们在映射器或者归约器文件的开头处添加一条标准“shebang”命令,这样它就能调用 Node.js 并运行脚本内容:

#!/usr/bin/env node 

接下来,大家可以通过命令行调用的方式测试自己的映射器代码了(以下示例假定代码位于名为 Mapper.js 的文件当中):

./mapper.js < input-file-path 

部署与运行

在编写了自己的映射器与归约器之后,接下来我们将其传输至 Amazon S3 当中,而后利用 Amazon EMR 针对部分输入数据运行 MapReduce。

以下示例讲解了如何利用 Amazon EMR 命令行执行各个步骤(参见 http://aws.amazon.com/developertools/2264 ),不过大家也可以在 Amazon EMR 控制台(参见 console.aws.amazon.com/elasticmapreduce )或者 Amazon EMR API(参见 http://docs.aws.amazon.com/ElasticMapReduce/latest/API/Welcome.html?r=8857 )中利用命令实现同样的效果。我们将展示如何以自动方式利用 AWS 命令行工具运行该应用程序,但大家完全可以使用 AWS Web Console 或者 AWS Data Pipeline 完成同样的工作。我们可以使用 --create-cluster 命令启动一套新的 Amazon EMR 集群,同时利用以下代码启动该集群并捃行我们的 Node.js bootstrap 操作:

aws emr create-cluster --ami-version 3.3.1 --enable-debugging --visible-to-all-users
 --name MyNodeJsMapReduceCluster --instance-groups  InstanceCount=2,InstanceGroupType=CORE,InstanceType=m3.xlarge 
InstanceCount=1,InstanceGroupType=MASTER,InstanceType=m3.xlarge --
no-auto-terminate --enable-debugging --log-uri s3:///logs --
bootstrap-actions Path=s3://github-emr-bootstrap-
actions/node/install-nodejs.sh,Name=InstallNode.js --ec2-attributes 
KeyName=<my key pair> 

这样我们就创建了一套始终启用的集群,其中包含配备 3.3.1 AMI、双核心节点以及一个主节点,全部采用 m3.xlarge 实例类型。以上代码同时为指定存储桶设定了调试与日志记录机制,并通过 bootstrap 操作完成了 Node.js 的启动时安装。除此之外,代码中还使用了 Amazon EC2 密钥对,从而将 SSH 安全机制引入该 Hadoop 集群。

接下来,我们将添加 Hadoop Streaming 流程,旨在处理自己的输入数据。在以下代码中,大家需要把 <my cluster ID> 替换为自己的实际集群 ID:

aws emr add-steps --cluster-id <my cluster ID> 
--steps Name=NodeJSStreamProcess,Type=Streaming

我们通过创建一套文件系统参考(利用—files 参数)添加自己的映射器与归约器 JavaScript 文件,而后将该基础文件名通过 -mapper 与 -reducer 进行引用:

Args=--files,"s3://<path to mapper>/mapper.js\,s3://<path to 
reducer>/reducer.js",-mapper,mapper.js,-reducer,reducer.js 

而后,我们添加该输入与输出文件的位置:

-input,s3://<path-to-input-files>,-output,s3://<path-to-output-files>

这样一来,我们就获得了如下完整命令行调用代码:

aws emr add-steps --cluster-id <my cluster ID> 
--steps Name=NodeJSStreamProcess,Type=Streaming,Args=--files,
"s3://<path to mapper>/mapper.js\,s3://<path to reducer>/reducer.js",
-input,s3://<path-to-input-files>,-output,s3://<
path-to-output-files>,-mapper,mapper.js,-reducer,reducer.js

大家只需要提供相应的时间量用于该流程运行,Hadoop 集群不再需要其它迭代实现数据生成。请确保在运行上述示例时,大家在执行完成后及时关闭自己的集群。我们可以利用以下 Amazon EMR 命令完成集群关闭操作:

aws emr terminate-clusters --cluster-ids <my cluster ID> 

总结

Node.js 能够在实现 MapReduce 应用程序快速执行效果的同时,利用简洁的原生语法对高复杂性 JSON 数据加以处理。通过 Amazon EMR 配置选项,大家可以轻松运行基于 Node.js 的应用程序,并随时间推移或者输入数据量的增加提升其规模。

附录——映射 / 归约应用程序示例

以下 MapReduce 程序旨在以天为单位将推文内容输出为高复杂性 JSON 结构化数据。在本示例中,Twitter 数据由 DataSift(来自 datasift.com)负责收集。我们去掉了其中的某些特殊字符,例如换行符与制表符,并将推文 created_at 字段输出为键。归约器随后根据日期对这部分数据加以排序,并输出推文的整体数量。

映射器

#!/usr/bin/env node
 
var events = require('events');
var emitter = new events.EventEmitter();
 
var line = '';
var lineEvent = 'line';
var dataReady = 'dataReady';
 
// 移除全部控制字符,从而保证输出结果由纯文本内容构成 
String.prototype.escape = function() {
    return this.replace('\n', '\\n').replace('\'', '\\\'').replace('\"', '\\"')
            .replace('\&', '\\&').replace('\r', '\\r').replace('\t', '\\t')
            .replace('\b', '\\b').replace('\f', '\\f');
}
 
// 为此附加一套数组 
Array.prototype.appendArray = function(arr) {
    this.push.apply(this, arr);
}
 
// 数据完成后,将其写入至必要的输出通道 
emitter.on(dataReady, function(arr) {
    var dateComponents = arr[9].split(' ');
    var d = [dateComponents[1],dateComponents[2],dateComponents[3]].join(' ');
     
    var interaction = {
        key_date : d,
        content: {
            objectId : arr[0],
            hash : arr[1],
            id : arr[2],
            author_id : arr[3],
            author_avatar : arr[4],
            author_link : arr[5],
            author_name : arr[6],
            author_username : arr[7],
            content : arr[8],
            created_at : arr[9],
            link : arr[10],
            schema_version : arr[11],
            source : arr[12]
        }
    };
 
    process.stdout.write(interaction.key_date + '\t' + JSON.stringify(interaction) + '\n');
});
 
// 通过捕捉到的输入数据生成一个 JSON 对象 
// 而后生成所需的输出结果 
emitter.on(lineEvent, function(l) {
    var obj;
 
    // 通过 input 事件创建该 JSON 对象 
    // 如果无法创建,则丢弃该项目 
    //
    // TODO 在此生成一个例外以代替?
    if (!line || line == '') {
        return;
    }
     
    try {
        obj = JSON.parse(line);
    } catch (err) {
        process.stderr.write('Error Processing Line ' + line + '\n');
        process.stderr.write(err);
        return;
    }
     
    // 为每个交互对象生成一个输出结果组 
    for ( var i = 0; i < obj.interactions.length; i++) {
        // 根据语法创建几个便捷对象 
        var int = obj.interactions[i];
        var a = int.interaction.author;
         
        // 提取我们需要保留的对象模型内容 
        var output = [ obj.id, obj.hash, int.interaction.id, a.id,
                a.avatar, a.link, a.name, a.username,
                int.interaction.content.escape(), int.interaction.created_at,
                int.interaction.link, int.interaction.schema.version,
                int.interaction.source ];
         
        // 当输出数组完成后触发事件 
        emitter.emit(dataReady, output);
    }
});
 
// 作用于每一次由 stdin 引发的数据块读取操作 
process.stdin.on('data', function(chunk) {
    // 新行中汇总并执行 
    lines = chunk.split("\n")
     
    if (lines.length > 0) {
        // 将第一套数据块添加至现有缓冲区中 
        line += lines[0]
         
        if (lines.length > 1) {
            // 执行当前缓冲内容 
            emitter.emit(lineEvent,line);
 
            // 推进行内剩余内容并加以执行,将最新内容纳入缓冲区 
            for (i=1; i<lines.length; i++) {
                if (i < lines.length) {
                    emitter.emit(lineEvent,lines[i]);
                } else {
                    line = lines[i];
                }
            }
        }
    }
});
 
// 当 stdin 读取操作完成后触发 
process.stdin.on('end', function() {
    emitter.emit(lineEvent,line);
});
 
// 设置 STDIN 编码 
process.stdin.setEncoding('utf8');
 
// 恢复 STDIN——默认暂停 
process.stdin.resume();

归约器

#!/usr/bin/env node
 
var events = require('events');
var emitter = new events.EventEmitter();
 
var remaining = '';
var lineReady = 'lineReady';
var dataReady = 'dataReady';
 
var interactionSummary = {
    day : '',
    count : 0
};
 
// 移除全部控制字符,从而保证输出结果由纯文本内容构成 
String.prototype.escape = function() {
    return this.replace('\n', '\\n').replace('\'', '\\\'').replace('\"', '\\"')
            .replace('\&', '\\&').replace('\r', '\\r').replace('\t', '\\t')
            .replace('\b', '\\b').replace('\f', '\\f');
}
 
// 为此附加一套数组 
Array.prototype.appendArray = function(arr) {
    this.push.apply(this, arr);
}
 
// 数据完成后,将其写入至必要的输出通道 
emitter.on(dataReady, function(o) {
    if (o) {
        process.stdout.write(JSON.stringify(o) + '\n');
    }
});
 
// 通过捕捉到的输入数据生成一个 JSON 对象 
// 而后生成所需的输出结果 
emitter.on(lineReady,function(data) {   
    if (!data || data == '') {
        // null 数据可能是一个关闭事件,意味着数据已经处理完毕 
        emitter.emit(dataReady, interactionSummary);
        return;
    }
     
    try {
        obj = JSON.parse(data.split('\t')[1]);
    } catch (err) {
        process.stderr.write('Error Processing Line ' + data + '\n')
        process.stderr.write(err);
        return;
    }
 
    if (interactionSummary.day == '') {
        interactionSummary.day = obj.key_date;
        interactionSummary.count = 1;       
    } else {
        if (obj.key_date != interactionSummary.day) {
            // 数组削减完成后触发事件 
            emitter.emit(dataReady, interactionSummary);
            interactionSummary.day = obj.key_date;
            interactionSummary.count = 1;
        } else {
            interactionSummary.count += 1;
        }
    }
});
 
// 作用于每一个从 stdin 处进行读取的数据块 
process.stdin.on('data', function(chunk) {
    var capture = chunk.split('\n');
 
    for (var i=0;i<capture.length; i++) {
        if (i==0) {
            emitter.emit(lineReady,remaining + capture[i]);
        } else if (i<capture.length-1) {
            emitter.emit(lineReady,capture[i]);
        } else {
            remaining = capture[i];
        }
    }
});
 
// 当 stdin 读取操作完成后触发 
process.stdin.on('end', function() {
    emitter.emit(lineReady,remaining);
});
 
// 恢复 STDIN——默认为暂停 
process.stdin.resume();
 
// 设置 STDIN 编码 
process.stdin.setEncoding('utf8');

运行示例

aws emr create-cluster --ami-version 3.3.1 --enable-debugging --visible-to-all-users 
--name MyNodeJsMapReduceCluster --instance-groups  InstanceCount=2,InstanceGroupType=CORE,InstanceType=m3.xlarge 
InstanceCount=1,InstanceGroupType=MASTER,InstanceType=m3.xlarge --
no-auto-terminate --enable-debugging --log-uri s3://<log 
bucket>/EMR/logs --bootstrap-actions Path=s3://github-emr-
bootstrap-actions/node/install-nodejs.sh,Name=InstallNode.js --
service-role EMR_DefaultRole --ec2-attributes KeyName=<my key 
pair>,InstanceProfile=EMR_EC2_DefaultRole
aws emr add-
steps --cluster-id  --steps 
Name=NodeJSStreamProcess,Type=Streaming,Args=--files,"s3://github-
aws-big-data-blog/aws-blog-nodejs-on-emr/scripts/sample-mapper.js
\,s3://github-aws-big-data-blog/aws-blog-nodejs-on-
emr/scripts/sample-reducer.js",-input,s3://github-aws-big-data-
blog/aws-blog-nodejs-on-emr/sample/tweets,-output,s3://<my output
 bucket>/node_sample,-mapper,mapper.js,-reducer,reducer.js

在上述代码中,<my output bucket> 应被替代为我们希望创建输出结果的目标存储桶名称。执行完成后,预配置的输出存储桶及路径内将出现多个文件,其中包含整理得出的示例数据集中单一一天内出现的推文数量:

{"day":"14 Feb 2013","count":1071} 

如果大家愿意提出一点意见或者建议,请在下方的评论栏中与我们分享。

原文链接: http://blogs.aws.amazon.com/bigdata/post/TxVX5RCSD785H6/Node-js-Streaming-MapReduce-with-Amazon-EMR


感谢 Raymond Zhao 对本文的审校。

给 InfoQ 中文站投稿或者参与内容翻译工作,请邮件至 editors@cn.infoq.com 。也欢迎大家通过新浪微博( @InfoQ @丁晓昀),微信(微信号: InfoQChina )关注我们,并与我们的编辑和其他读者朋友交流(欢迎加入 InfoQ 读者交流群)。

收藏

评论

微博

发表评论

注册/登录 InfoQ 发表评论