NVIDIA 初创加速计划,免费加速您的创业启动 了解详情
写点什么

探索 Hadoop OutputFormat

  • 2012-01-20
  • 本文字数:6837 字

    阅读完需:约 22 分钟

Hadoop 常常被用作大型数据处理生态系统中的一部分。它的优势在于能够批量地处理大量数据,并将结果以最好的方式与其他系统相集成。从高层次角度来看,整个过程就是 Hadoop 接收输入文件、使用自定义转换(Map-Reduce 步骤)获得内容流,以及将输出文件的结果写回磁盘。上个月 InfoQ 展示了怎样在第一个步骤中,使用 InputFormat 类来更好地对接收输入文件进行控制。而在本文中,我们将同大家一起探讨怎样自定义最后一个步骤——即怎样写入输出文件。OutputFormat 将 Map/Reduce 作业的输出结果转换为其他应用程序可读的方式,从而轻松实现与其他系统的互操作。为了展示 OutputFormts 的实用性,我们将用两个例子进行讨论:如何拆分作业结果到不同目录以及如何为提供快速键值查找的服务写入文件。

OutputFormats 是做什么的?

OutputFormt 接口决定了在哪里以及怎样持久化作业结果。Hadoop 为不同类型的格式提供了一系列的类和接口,实现自定义操作只要继承其中的某个类或接口即可。你可能已经熟悉了默认的 OutputFormat,也就是 TextOutputFormat,它是一种以行分隔,包含制表符界定的键值对的文本文件格式。尽管如此,对多数类型的数据而言,如再常见不过的数字,文本序列化会浪费一些空间,由此带来的结果是运行时间更长且资源消耗更多。为了避免文本文件的弊端,Hadoop 提供了 SequenceFileOutputformat,它将对象表示成二进制形式而不再是文本文件,并将结果进行压缩。下面是 Hadoop 提供的类层次结构:

  • FileOutputFormat(实现 OutputFormat 接口)—— 所有 OutputFormats 的基类
    • MapFileOutputFormat —— 一种使用部分索引键的格式
    • SequenceFileOutputFormat —— 二进制键值数据的压缩格式
      • SequenceFileAsBinaryOutputFormat —— 原生二进制数据的压缩格式
    • TextOutputFormat —— 以行分隔、包含制表符定界的键值对的文本文件格式
    • MultipleOutputFormat —— 使用键值对参数写入文件的抽象类
      • MultipleTextOutputFormat —— 输出多个以标准行分割、制表符定界格式的文件
      • MultipleSequenceFileOutputFormat —— 输出多个压缩格式的文件

OutputFormat 提供了对 RecordWriter 的实现,从而指定如何序列化数据。 RecordWriter 类可以处理包含单个键值对的作业,并将结果写入到 OutputFormat 中准备好的位置。RecordWriter 的实现主要包括两个函数:“write”和“close”。“write”函数从 Map/Reduce 作业中取出键值对,并将其字节写入磁盘。LineRecordWriter 是默认使用的 RecordWriter,它是前面提到的 TextOutputFormat 的一部分。它写入的内容包括:

  • 键 (key) 的字节 (由 getBytes() 函数返回)
  • 一个用以定界的制表符
  • 值 (value) 的字节(同样由 getBytes() 函数返回)
  • 一个换行符

“close”函数会关闭 Hadoop 到输出文件的数据流。

我们已经讨论了输出数据的格式,下面我们关心的问题是数据存储在何处?同样,你或许看到过某个作业的输出结果会以多个“部分”文件的方式存储在输出目录中,如下:

复制代码
|-- output-directory
| |-- part-00000
| |-- part-00001
| |-- part-00002
| |-- part-00003
| |-- part-00004
'-- part-00005
{1}

默认情况下,当需要写入数据时,每个进程都会在输出目录创建自己的文件。数据由 reducers 在作业结束时写入(如果没有 reducers 会由 mapper 写入)。即使在本文后面提到的创建自定义输出目录时,我们仍会保持写入“部分”文件,这么做可以让多个进程同时写入同一个目录而互不干扰。

自定义 OutputFormat

从前面我们已经看到,OutputFormat 类的主要职责是决定数据的存储位置以及写入的方式。那么为什么要自定义这些行为呢?自定义数据位置的原因之一是为了将 Map/Reduce 作业输出分离到不同的目录。例如,假设需要处理一个包含世界范围内的搜索请求的日志文件,并希望计算出每个国家的搜索频度。你想要在不牵涉其他国家的前提下能够查看某个特定国家的结果。也许以后在你的数据管道中,会用不同的进程来处理不同的国家,或者想要把某个特定国家的结果复制一份到该国的数据中心去。使用默认的 OutputFormat 时,所有的数据都会存储在同一目录下,这样在不浏览的情况下是无从知晓“部分”文件的内容的。而通过使用自定义的 OutputFormat,你可以为每个国家创建一个子目录的布局,如下:

复制代码
|-- output-directory
| |-- France
| | |-- part-00000
| | |-- part-00001
| | '-- part-00002
... |
| '-- Zimbabwe
| |-- part-00000
| |-- part-00001
| '-- part-00002

其中每个部分文件都具有键值对(“搜索词汇”=> 频度)。现在只要简单地指定某个国家数据所在的路径,就可以只读取该国家的数据了。下面我们将看到怎样继承 MultipleTextOutputFormat 类,以获得所需的行为。

自定义 OutputFormat 还有一些其他的原因,以名为 ElephantDB 的项目为例, 它将数据以一种面向消费应用程序的“本地”形式进行存储。这个项目的设立是为了让 Map/Reduece 作业结果可以像分布式服务一样被查询。ElephantDB 写入的并不是文本文件,而是使用自定义的 OutputFormat 将结果写成 BerkeleyDB 文件,其中这些文件使用作业输出的键进行索引。之后使用某个服务加载 BerkeleyDB 文件,可以提供低延滞的任意键查找。类似的系统还有 HBase 和 Voldemort,它们可以存储 Hadoop 生成的键值数据。ElephantDB 重点关注的是怎样与 Hadoop 批量式更新进行简易紧密的集成。

多路输出

为了解决上面的搜索日志的问题,我们继承了 MultipleTextOutputFormat 类,并根据被写入的键值来选择输出目录。我们的 Map/Reduce 作业将会为搜索请求所在国家生成一个键,并为搜索词汇及该搜索的频度产生一个值。由于 MultipleTextOutputFormat 已经知道如何写入文本文件,因此并不需要为 OutputFormat 实现序列化功能。清单 1 实现了该类:

复制代码
1 package oddjob.hadoop;
2
3 import org.apache.hadoop.fs.Path;
4 import org.apache.hadoop.io.Text;
5 import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat;
6
7 public class MultipleTextOutputFormatByKey extends MultipleTextOutputFormat<Text, Text> {
8
9 /**
10 * Use they key as part of the path for the final output file.
11 */
12 @Override
13 protected String generateFileNameForKeyValue(Text key, Text value, String leaf) {
14 return new Path(key.toString(), leaf).toString();
15 }
16
17 /**
18 * When actually writing the data, discard the key since it is already in
19 * the file path.
20 */
21 @Override
22 protected Text generateActualKey(Text key, Text value) {
23 return null;
24 }
25 }

清单 1:MultipleTextOutputFormat 子类样例

MultipleTextOutputFormatByKey 类的 generateActualFileNameForKeyValue 方法指定了作业输出的存储位置(第 13 行)。对于每组由 Map/Reduce 作业生成的键值对,该类会把键加入到路径名称中作为输出。“leaf”参数就是我们之前看到的“part-0000”,它在每个 reducer 中都是独一无二的,这样可以允许不同进程同时写入到输出目录而互不影响。例如,由第一个 reducer 产生的键为“France”、值为“soccer 5000”的结果会被写入到“output-directory/France/part-00000”内的某个文件中。

要使用这个类,需确保 Hadoop 包含了这个自定义类的 jar,并使用完整的类名作为“-outputformat”的参数:

复制代码
hadoop jar hadoop-streaming.jar -libjars CustomOutputFormats.jar \
-outputformat oddjob.hadoop.MultipleTextOutputFormatByKey \
-input search-logs \
-output search-frequency-by-country \
-mapper parse-logs.py \
-reducer count-searches.py

清单 1 是 oddjob 项目中某个类的 Java 实现。oddjob 是一个开源库,提供了多种 MultipleTextOutputFormat。虽然这个库面向的是 Hadoop 的流特性,但是它也可以用在产生文本键值输出的其他作业中。

为服务准备输出

在我们的下一个例子中,必须实现两个接口来自定义数据序列化以及文件存放的目录结构,以使结果可被 ElephantDB 服务加载。正如前面所讨论的,序列化部分会由 RecordWriter 的实现来处理。在 LineRecordWriter 类将字节流写入输出文件的同时,ElephantRecordWriter 还包含了专门的逻辑用来选择要写入的文件以及使用第三方库来格式化磁盘上的数据。

复制代码
1 public class ElephantRecordWriter implements RecordWriter<IntWritable, ElephantRecordWritable> {
2
3 FileSystem _fs;
4 Args _args;
5 Map<Integer, LocalPersistence> _lps = new HashMap<Integer, LocalPersistence>();
6 Progressable _progressable;
7 LocalElephantManager _localManager;
8
9 int _numWritten = 0;
10 long _lastCheckpoint = System.currentTimeMillis();
11
12 public ElephantRecordWriter(Configuration conf, Args args, Progressable progressable) throws IOException {
13 _fs = Utils.getFS(args.outputDirHdfs, conf);
14 _args = args;
15 _progressable = progressable;
16 _localManager = new LocalElephantManager(_fs, args.spec, args.persistenceOptions, LocalElephantManager.getTmpDirs(conf));
17 }
18
19 private String remoteUpdateDirForShard(int shard) {
20 if(_args.updateDirHdfs==null) return null;
21 else return _args.updateDirHdfs + "/" + shard;
22 }
23
24 public void write(IntWritable shard, ElephantRecordWritable record) throws IOException {
25 LocalPersistence lp = null;
26 LocalPersistenceFactory fact = _args.spec.getLPFactory();
27 Map<String, Object> options = _args.persistenceOptions;
28 if(_lps.containsKey(shard.get())) {
29 lp = _lps.get(shard.get());
30 } else {
31 String updateDir = remoteUpdateDirForShard(shard.get());
32 String localShard = _localManager.downloadRemoteShard("" + shard.get(), updateDir);
33 lp = fact.openPersistenceForAppend(localShard, options);
34 _lps.put(shard.get(), lp);
35 progress();
36 }
37
38 _args.updater.updateElephant(lp, record.key, record.val);
39
40 _numWritten++;
41 if(_numWritten % 25000 == 0) {
42 long now = System.currentTimeMillis();
43 long delta = now - _lastCheckpoint;
44 _lastCheckpoint = now;
45 LOG.info("Wrote last 25000 records in " + delta + " ms");
46 _localManager.progress();
47 }
48 }
49
50 public void close(Reporter reporter) throws IOException {
51 for(Integer shard: _lps.keySet()) {
52 String lpDir = _localManager.localTmpDir("" + shard);
53 LOG.info("Closing LP for shard " + shard + " at " + lpDir);
54 _lps.get(shard).close();
55 LOG.info("Closed LP for shard " + shard + " at " + lpDir);
56 progress();
57 String remoteDir = _args.outputDirHdfs + "/" + shard;
58 if(_fs.exists(new Path(remoteDir))) {
59 LOG.info("Deleting existing shard " + shard + " at " + remoteDir);
60 _fs.delete(new Path(remoteDir), true);
61 LOG.info("Deleted existing shard " + shard + " at " + remoteDir);
62 }
63 LOG.info("Copying " + lpDir + " to " + remoteDir);
64 _fs.copyFromLocalFile(new Path(lpDir), new Path(remoteDir));
65 LOG.info("Copied " + lpDir + " to " + remoteDir);
66 progress();
67 }
68 _localManager.cleanup();
69 }
70
71 private void progress() {
72 if(_progressable!=null) _progressable.progress();
73 }
74 }

清单 2:从 ElephantDB 中摘录的某个 RecordWriter 子类

ElephantDB 的工作方式是通过跨越若干个 LocalPersistence 对象(BerkeleyDB 文件)来对数据进行分片(划分)。ElephantRecordWriter 类中的 write 函数拿到分片 ID,并检查该分片是否已经打开(第 28 行),如果没有则打开并创建一个新的本地文件(第 33 行)。第 38 行的 updateElephant 调用将作业输出的键值对写入到 BerkeleyDB 文件。

当关闭 ElephantRecordWriter 时,该类在第 64 行会复制 BerkeleyDB 文件到 HDFS 中,且可以随意选择是否覆盖旧文件。接下去的 progress 方法调用会通知 Hadoop 当前的 RecordWriter 正在按计划进行,这有点类似于真实 Map/Reduce 作业中的状态或计数器更新。

下一步是利用 ElephantRecordWriter 来实现 OutputFormat。要理解此清单中的代码,重点是了解 Hadoop JobConf 对象封装了什么。顾名思义,JobConf 对象包含了某项作业的全部设置,包括输入输出目录,作业名称以及 mapper 和 reducer 类。清单 3 展示了两个自定义类是如何共同工作的:

复制代码
1 public class ElephantOutputFormat implements OutputFormat<IntWritable, ElephantRecordWritable> {
2 public static Logger LOG = Logger.getLogger(ElephantOutputFormat.class);
3
4 public RecordWriter<IntWritable, ElephantRecordWritable> getRecordWriter(FileSystem fs, JobConf conf, String string, Progressable progressable) throws IOException {
5 return new ElephantRecordWriter(conf, (Args) Utils.getObject(conf, ARGS_CONF), progressable);
6 }
7
8 public void checkOutputSpecs(FileSystem fs, JobConf conf) throws IOException {
9 Args args = (Args) Utils.getObject(conf, ARGS_CONF);
10 fs = Utils.getFS(args.outputDirHdfs, conf);
11 if(conf.getBoolean("mapred.reduce.tasks.speculative.execution", true)) {
12 throw new InvalidJobConfException("Speculative execution should be false");
13 }
14 if(fs.exists(new Path(args.outputDirHdfs))) {
15 throw new InvalidJobConfException("Output dir already exists " + args.outputDirHdfs);
16 }
17 if(args.updateDirHdfs!=null && !fs.exists(new Path(args.updateDirHdfs))) {
18 throw new InvalidJobConfException("Shards to update does not exist " + args.updateDirHdfs);
19 }
20 }
21 }

清单 3:从 ElephantDB 中摘录的某个 OutputFormat 实现

正如前面所看到的,OutputFormat 有两个职责,分别是决定数据的存储位置以及数据写入的方式。ElephantOutputFormat 的数据存储位置是通过检查 JobConf 以及在第 14 和 17 行检查确保该位置是一个合法目标位置后来决定的。至于数据的写入方式,则是由 getRecordWriter 函数处理,它的返回结果是清单 2 中的 ElephantRecordWriter 对象。

从 Hadoop 的角度来看,当 Map/Reduce 作业结束并且每个 reducer 产生了键值对流的时候,这些类会派上用场。Hadoop 会以作业配置为参数调用 checkOutputSpecs。如果函数运行没有抛出异常,它会接下去调用 getRecordWriter 以返回可以写入流数据的对象。当所有的键值对都被写入后,Hadoop 会调用 writer 中的 close 函数,将数据提交到 HDFS 并结束该 reducer 的职责。

总结

OutputFormat 是 Hadoop 框架中的重要组成部分。它们通过为目标消费应用程序产生合适的输出来提供与其他系统和服务间的互操作。自定义作业输出位置可以简化并加速数据工作流;而自定义结果输出方式可以让其快速地工作于其他不同的环境下。虽然实现 OutputFormat 和覆写几个方法一样简单,但是它足够灵活可以支持全新的磁盘上的数据格式。

关于作者

Jim Blomo ( @jimblomo ) 热衷于开发强劲优雅的系统来操控数据。在 Yelp 公司中,他负责管理一个日益增长的数据挖掘团队,该团队使用 Hadoop,mrjob 以及 oddjob 处理 TB 级的数据。在加入 Yelp 公司前,他曾经为创业公司和 Amazon 构建基础设施。他喜爱美食,文化,以及同妻子一道漫步在旧金山湾区的户外。

查看英文原文: Exploring Hadoop OutputFormat


感谢侯伯薇对本文的审校。

给InfoQ 中文站投稿或者参与内容翻译工作,请邮件至 editors@cn.infoq.com 。也欢迎大家通过新浪微博( @InfoQ )或者腾讯微博( @InfoQ )关注我们,并与我们的编辑和其他读者朋友交流。

公众号推荐:

跳进 AI 的奇妙世界,一起探索未来工作的新风貌!想要深入了解 AI 如何成为产业创新的新引擎?好奇哪些城市正成为 AI 人才的新磁场?《中国生成式 AI 开发者洞察 2024》由 InfoQ 研究中心精心打造,为你深度解锁生成式 AI 领域的最新开发者动态。无论你是资深研发者,还是对生成式 AI 充满好奇的新手,这份报告都是你不可错过的知识宝典。欢迎大家扫码关注「AI前线」公众号,回复「开发者洞察」领取。

2012-01-20 00:009616
用户头像

发布了 125 篇内容, 共 35.7 次阅读, 收获喜欢 5 次。

关注

评论

发布
暂无评论
发现更多内容

一种高效解决12306第三方抢票不公平乱收费体验差的新技术

巨公摇号创始人钱庆照

12306 第三方付费抢票 随机信标 巨公摇号 抢火车票

开发者集结令丨Farris-Vue前端组件创新挑战赛开赛啦!

inBuilder低代码平台

前端 低代码 开发

Dock栏快速启动程序uDock for Mac 激活版

影影绰绰一往直前

博客生成编辑器MWeb Pro for Mac v4.5.6中文激活版

影影绰绰一往直前

海外云手机:入局海外市场的最佳利器

Ogcloud

云手机 海外云手机 云手机海外版 国外云手机

为什么VPS比传统虚拟空间更受欢迎?深度解析!

一只扑棱蛾子

VPS

Tiktok云手机是什么,做tiktok养号有什么优势?

Ogcloud

云手机 海外云手机 tiktok云手机 云手机海外版

Blocs for mac:可视化Web设计,全新的方法来构建现代化、高质量的静态网站

Rose

mac电脑音乐制作软件 Ableton Live 12 for mac中文激活

影影绰绰一往直前

Ableton Live 12下载 Ableton Live 12激活版 Ableton Live 12中文

将Excel转换为HTML:Easy Data Transform for mac

Rose

前端开发CSS实用的技巧有哪些

小魏写代码

云计算 - 弹性计算技术全解与实践

快乐非自愿限量之名

云计算 物联网 弹性计算

服务器操作卡,出现蓝屏、死机,该怎么解决

德迅云安全杨德俊

运营商数智化缩影:一部哑资源的资源管理史

鲸品堂

网络 资源 运营商 企业号 2 月 PK 榜

Axure RP 10中文汉化版 交互式原型设计

Rose

告别 GPU 焦虑,玩转极致性价比的 CPU 文生图

阿里巴巴云原生

阿里云 Kubernetes 云原生

microsoft 365永久激活密钥

Rose

可视化代码编辑器Blocs for mac v5.2.1激活版下载

影影绰绰一往直前

AE如何导入LUTS呢 ?After Effects导入lut使用详细教程

Rose

文本管理软件 FSNotes for mac v6.6.7中文版

影影绰绰一往直前

photoshop2024硬件要求

Rose

macOS Big Sur 11安装包(macOS11系统下载) v11.7.10正式版

Rose

从技术到管理:如何避免失去专业指导能力的陷阱?

码哥字节

程序员 架构师 职业发展

数据库管理软件 DBeaverUE for Mac v23.3.4旗舰激活版

影影绰绰一往直前

Final Cut Pro 中文基础教程:多机位剪辑

Rose

XMind for mac XMind思维导图 v24.01中文版

Rose

ai全称是什么?好用的AI软件有哪些?这14款一定要知道。

彭宏豪95

AI 在线白板 AIGC AI绘画 效率软件

用云手机打造tiktok账号需要注意些什么?

Ogcloud

云手机 海外云手机 tiktok云手机 云手机海外版

好“云”来!盘点春节与云计算息息相关的那些事儿

Finovy Cloud

云计算 云时代

VMware Fusion Pro 13(VM虚拟机)中文破解版安装教程

Rose

Easy Data Transform for Mac v1.46.3激活版下载 强大数据转换工具

影影绰绰一往直前

探索Hadoop OutputFormat_大数据_Jim.Blomo_InfoQ精选文章