大咖直播-鸿蒙原生开发与智能提效实战!>>> 了解详情
写点什么

探索 Hadoop OutputFormat

  • 2012-01-20
  • 本文字数:6837 字

    阅读完需:约 22 分钟

Hadoop 常常被用作大型数据处理生态系统中的一部分。它的优势在于能够批量地处理大量数据,并将结果以最好的方式与其他系统相集成。从高层次角度来看,整个过程就是 Hadoop 接收输入文件、使用自定义转换(Map-Reduce 步骤)获得内容流,以及将输出文件的结果写回磁盘。上个月 InfoQ 展示了怎样在第一个步骤中,使用 InputFormat 类来更好地对接收输入文件进行控制。而在本文中,我们将同大家一起探讨怎样自定义最后一个步骤——即怎样写入输出文件。OutputFormat 将 Map/Reduce 作业的输出结果转换为其他应用程序可读的方式,从而轻松实现与其他系统的互操作。为了展示 OutputFormts 的实用性,我们将用两个例子进行讨论:如何拆分作业结果到不同目录以及如何为提供快速键值查找的服务写入文件。

OutputFormats 是做什么的?

OutputFormt 接口决定了在哪里以及怎样持久化作业结果。Hadoop 为不同类型的格式提供了一系列的类和接口,实现自定义操作只要继承其中的某个类或接口即可。你可能已经熟悉了默认的 OutputFormat,也就是 TextOutputFormat,它是一种以行分隔,包含制表符界定的键值对的文本文件格式。尽管如此,对多数类型的数据而言,如再常见不过的数字,文本序列化会浪费一些空间,由此带来的结果是运行时间更长且资源消耗更多。为了避免文本文件的弊端,Hadoop 提供了 SequenceFileOutputformat,它将对象表示成二进制形式而不再是文本文件,并将结果进行压缩。下面是 Hadoop 提供的类层次结构:

  • FileOutputFormat(实现 OutputFormat 接口)—— 所有 OutputFormats 的基类
    • MapFileOutputFormat —— 一种使用部分索引键的格式
    • SequenceFileOutputFormat —— 二进制键值数据的压缩格式
      • SequenceFileAsBinaryOutputFormat —— 原生二进制数据的压缩格式
    • TextOutputFormat —— 以行分隔、包含制表符定界的键值对的文本文件格式
    • MultipleOutputFormat —— 使用键值对参数写入文件的抽象类
      • MultipleTextOutputFormat —— 输出多个以标准行分割、制表符定界格式的文件
      • MultipleSequenceFileOutputFormat —— 输出多个压缩格式的文件

OutputFormat 提供了对 RecordWriter 的实现,从而指定如何序列化数据。 RecordWriter 类可以处理包含单个键值对的作业,并将结果写入到 OutputFormat 中准备好的位置。RecordWriter 的实现主要包括两个函数:“write”和“close”。“write”函数从 Map/Reduce 作业中取出键值对,并将其字节写入磁盘。LineRecordWriter 是默认使用的 RecordWriter,它是前面提到的 TextOutputFormat 的一部分。它写入的内容包括:

  • 键 (key) 的字节 (由 getBytes() 函数返回)
  • 一个用以定界的制表符
  • 值 (value) 的字节(同样由 getBytes() 函数返回)
  • 一个换行符

“close”函数会关闭 Hadoop 到输出文件的数据流。

我们已经讨论了输出数据的格式,下面我们关心的问题是数据存储在何处?同样,你或许看到过某个作业的输出结果会以多个“部分”文件的方式存储在输出目录中,如下:

复制代码
|-- output-directory
| |-- part-00000
| |-- part-00001
| |-- part-00002
| |-- part-00003
| |-- part-00004
'-- part-00005
{1}

默认情况下,当需要写入数据时,每个进程都会在输出目录创建自己的文件。数据由 reducers 在作业结束时写入(如果没有 reducers 会由 mapper 写入)。即使在本文后面提到的创建自定义输出目录时,我们仍会保持写入“部分”文件,这么做可以让多个进程同时写入同一个目录而互不干扰。

自定义 OutputFormat

从前面我们已经看到,OutputFormat 类的主要职责是决定数据的存储位置以及写入的方式。那么为什么要自定义这些行为呢?自定义数据位置的原因之一是为了将 Map/Reduce 作业输出分离到不同的目录。例如,假设需要处理一个包含世界范围内的搜索请求的日志文件,并希望计算出每个国家的搜索频度。你想要在不牵涉其他国家的前提下能够查看某个特定国家的结果。也许以后在你的数据管道中,会用不同的进程来处理不同的国家,或者想要把某个特定国家的结果复制一份到该国的数据中心去。使用默认的 OutputFormat 时,所有的数据都会存储在同一目录下,这样在不浏览的情况下是无从知晓“部分”文件的内容的。而通过使用自定义的 OutputFormat,你可以为每个国家创建一个子目录的布局,如下:

复制代码
|-- output-directory
| |-- France
| | |-- part-00000
| | |-- part-00001
| | '-- part-00002
... |
| '-- Zimbabwe
| |-- part-00000
| |-- part-00001
| '-- part-00002

其中每个部分文件都具有键值对(“搜索词汇”=> 频度)。现在只要简单地指定某个国家数据所在的路径,就可以只读取该国家的数据了。下面我们将看到怎样继承 MultipleTextOutputFormat 类,以获得所需的行为。

自定义 OutputFormat 还有一些其他的原因,以名为 ElephantDB 的项目为例, 它将数据以一种面向消费应用程序的“本地”形式进行存储。这个项目的设立是为了让 Map/Reduece 作业结果可以像分布式服务一样被查询。ElephantDB 写入的并不是文本文件,而是使用自定义的 OutputFormat 将结果写成 BerkeleyDB 文件,其中这些文件使用作业输出的键进行索引。之后使用某个服务加载 BerkeleyDB 文件,可以提供低延滞的任意键查找。类似的系统还有 HBase 和 Voldemort,它们可以存储 Hadoop 生成的键值数据。ElephantDB 重点关注的是怎样与 Hadoop 批量式更新进行简易紧密的集成。

多路输出

为了解决上面的搜索日志的问题,我们继承了 MultipleTextOutputFormat 类,并根据被写入的键值来选择输出目录。我们的 Map/Reduce 作业将会为搜索请求所在国家生成一个键,并为搜索词汇及该搜索的频度产生一个值。由于 MultipleTextOutputFormat 已经知道如何写入文本文件,因此并不需要为 OutputFormat 实现序列化功能。清单 1 实现了该类:

复制代码
1 package oddjob.hadoop;
2
3 import org.apache.hadoop.fs.Path;
4 import org.apache.hadoop.io.Text;
5 import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat;
6
7 public class MultipleTextOutputFormatByKey extends MultipleTextOutputFormat<Text, Text> {
8
9 /**
10 * Use they key as part of the path for the final output file.
11 */
12 @Override
13 protected String generateFileNameForKeyValue(Text key, Text value, String leaf) {
14 return new Path(key.toString(), leaf).toString();
15 }
16
17 /**
18 * When actually writing the data, discard the key since it is already in
19 * the file path.
20 */
21 @Override
22 protected Text generateActualKey(Text key, Text value) {
23 return null;
24 }
25 }

清单 1:MultipleTextOutputFormat 子类样例

MultipleTextOutputFormatByKey 类的 generateActualFileNameForKeyValue 方法指定了作业输出的存储位置(第 13 行)。对于每组由 Map/Reduce 作业生成的键值对,该类会把键加入到路径名称中作为输出。“leaf”参数就是我们之前看到的“part-0000”,它在每个 reducer 中都是独一无二的,这样可以允许不同进程同时写入到输出目录而互不影响。例如,由第一个 reducer 产生的键为“France”、值为“soccer 5000”的结果会被写入到“output-directory/France/part-00000”内的某个文件中。

要使用这个类,需确保 Hadoop 包含了这个自定义类的 jar,并使用完整的类名作为“-outputformat”的参数:

复制代码
hadoop jar hadoop-streaming.jar -libjars CustomOutputFormats.jar \
-outputformat oddjob.hadoop.MultipleTextOutputFormatByKey \
-input search-logs \
-output search-frequency-by-country \
-mapper parse-logs.py \
-reducer count-searches.py

清单 1 是 oddjob 项目中某个类的 Java 实现。oddjob 是一个开源库,提供了多种 MultipleTextOutputFormat。虽然这个库面向的是 Hadoop 的流特性,但是它也可以用在产生文本键值输出的其他作业中。

为服务准备输出

在我们的下一个例子中,必须实现两个接口来自定义数据序列化以及文件存放的目录结构,以使结果可被 ElephantDB 服务加载。正如前面所讨论的,序列化部分会由 RecordWriter 的实现来处理。在 LineRecordWriter 类将字节流写入输出文件的同时,ElephantRecordWriter 还包含了专门的逻辑用来选择要写入的文件以及使用第三方库来格式化磁盘上的数据。

复制代码
1 public class ElephantRecordWriter implements RecordWriter<IntWritable, ElephantRecordWritable> {
2
3 FileSystem _fs;
4 Args _args;
5 Map<Integer, LocalPersistence> _lps = new HashMap<Integer, LocalPersistence>();
6 Progressable _progressable;
7 LocalElephantManager _localManager;
8
9 int _numWritten = 0;
10 long _lastCheckpoint = System.currentTimeMillis();
11
12 public ElephantRecordWriter(Configuration conf, Args args, Progressable progressable) throws IOException {
13 _fs = Utils.getFS(args.outputDirHdfs, conf);
14 _args = args;
15 _progressable = progressable;
16 _localManager = new LocalElephantManager(_fs, args.spec, args.persistenceOptions, LocalElephantManager.getTmpDirs(conf));
17 }
18
19 private String remoteUpdateDirForShard(int shard) {
20 if(_args.updateDirHdfs==null) return null;
21 else return _args.updateDirHdfs + "/" + shard;
22 }
23
24 public void write(IntWritable shard, ElephantRecordWritable record) throws IOException {
25 LocalPersistence lp = null;
26 LocalPersistenceFactory fact = _args.spec.getLPFactory();
27 Map<String, Object> options = _args.persistenceOptions;
28 if(_lps.containsKey(shard.get())) {
29 lp = _lps.get(shard.get());
30 } else {
31 String updateDir = remoteUpdateDirForShard(shard.get());
32 String localShard = _localManager.downloadRemoteShard("" + shard.get(), updateDir);
33 lp = fact.openPersistenceForAppend(localShard, options);
34 _lps.put(shard.get(), lp);
35 progress();
36 }
37
38 _args.updater.updateElephant(lp, record.key, record.val);
39
40 _numWritten++;
41 if(_numWritten % 25000 == 0) {
42 long now = System.currentTimeMillis();
43 long delta = now - _lastCheckpoint;
44 _lastCheckpoint = now;
45 LOG.info("Wrote last 25000 records in " + delta + " ms");
46 _localManager.progress();
47 }
48 }
49
50 public void close(Reporter reporter) throws IOException {
51 for(Integer shard: _lps.keySet()) {
52 String lpDir = _localManager.localTmpDir("" + shard);
53 LOG.info("Closing LP for shard " + shard + " at " + lpDir);
54 _lps.get(shard).close();
55 LOG.info("Closed LP for shard " + shard + " at " + lpDir);
56 progress();
57 String remoteDir = _args.outputDirHdfs + "/" + shard;
58 if(_fs.exists(new Path(remoteDir))) {
59 LOG.info("Deleting existing shard " + shard + " at " + remoteDir);
60 _fs.delete(new Path(remoteDir), true);
61 LOG.info("Deleted existing shard " + shard + " at " + remoteDir);
62 }
63 LOG.info("Copying " + lpDir + " to " + remoteDir);
64 _fs.copyFromLocalFile(new Path(lpDir), new Path(remoteDir));
65 LOG.info("Copied " + lpDir + " to " + remoteDir);
66 progress();
67 }
68 _localManager.cleanup();
69 }
70
71 private void progress() {
72 if(_progressable!=null) _progressable.progress();
73 }
74 }

清单 2:从 ElephantDB 中摘录的某个 RecordWriter 子类

ElephantDB 的工作方式是通过跨越若干个 LocalPersistence 对象(BerkeleyDB 文件)来对数据进行分片(划分)。ElephantRecordWriter 类中的 write 函数拿到分片 ID,并检查该分片是否已经打开(第 28 行),如果没有则打开并创建一个新的本地文件(第 33 行)。第 38 行的 updateElephant 调用将作业输出的键值对写入到 BerkeleyDB 文件。

当关闭 ElephantRecordWriter 时,该类在第 64 行会复制 BerkeleyDB 文件到 HDFS 中,且可以随意选择是否覆盖旧文件。接下去的 progress 方法调用会通知 Hadoop 当前的 RecordWriter 正在按计划进行,这有点类似于真实 Map/Reduce 作业中的状态或计数器更新。

下一步是利用 ElephantRecordWriter 来实现 OutputFormat。要理解此清单中的代码,重点是了解 Hadoop JobConf 对象封装了什么。顾名思义,JobConf 对象包含了某项作业的全部设置,包括输入输出目录,作业名称以及 mapper 和 reducer 类。清单 3 展示了两个自定义类是如何共同工作的:

复制代码
1 public class ElephantOutputFormat implements OutputFormat<IntWritable, ElephantRecordWritable> {
2 public static Logger LOG = Logger.getLogger(ElephantOutputFormat.class);
3
4 public RecordWriter<IntWritable, ElephantRecordWritable> getRecordWriter(FileSystem fs, JobConf conf, String string, Progressable progressable) throws IOException {
5 return new ElephantRecordWriter(conf, (Args) Utils.getObject(conf, ARGS_CONF), progressable);
6 }
7
8 public void checkOutputSpecs(FileSystem fs, JobConf conf) throws IOException {
9 Args args = (Args) Utils.getObject(conf, ARGS_CONF);
10 fs = Utils.getFS(args.outputDirHdfs, conf);
11 if(conf.getBoolean("mapred.reduce.tasks.speculative.execution", true)) {
12 throw new InvalidJobConfException("Speculative execution should be false");
13 }
14 if(fs.exists(new Path(args.outputDirHdfs))) {
15 throw new InvalidJobConfException("Output dir already exists " + args.outputDirHdfs);
16 }
17 if(args.updateDirHdfs!=null && !fs.exists(new Path(args.updateDirHdfs))) {
18 throw new InvalidJobConfException("Shards to update does not exist " + args.updateDirHdfs);
19 }
20 }
21 }

清单 3:从 ElephantDB 中摘录的某个 OutputFormat 实现

正如前面所看到的,OutputFormat 有两个职责,分别是决定数据的存储位置以及数据写入的方式。ElephantOutputFormat 的数据存储位置是通过检查 JobConf 以及在第 14 和 17 行检查确保该位置是一个合法目标位置后来决定的。至于数据的写入方式,则是由 getRecordWriter 函数处理,它的返回结果是清单 2 中的 ElephantRecordWriter 对象。

从 Hadoop 的角度来看,当 Map/Reduce 作业结束并且每个 reducer 产生了键值对流的时候,这些类会派上用场。Hadoop 会以作业配置为参数调用 checkOutputSpecs。如果函数运行没有抛出异常,它会接下去调用 getRecordWriter 以返回可以写入流数据的对象。当所有的键值对都被写入后,Hadoop 会调用 writer 中的 close 函数,将数据提交到 HDFS 并结束该 reducer 的职责。

总结

OutputFormat 是 Hadoop 框架中的重要组成部分。它们通过为目标消费应用程序产生合适的输出来提供与其他系统和服务间的互操作。自定义作业输出位置可以简化并加速数据工作流;而自定义结果输出方式可以让其快速地工作于其他不同的环境下。虽然实现 OutputFormat 和覆写几个方法一样简单,但是它足够灵活可以支持全新的磁盘上的数据格式。

关于作者

Jim Blomo ( @jimblomo ) 热衷于开发强劲优雅的系统来操控数据。在 Yelp 公司中,他负责管理一个日益增长的数据挖掘团队,该团队使用 Hadoop,mrjob 以及 oddjob 处理 TB 级的数据。在加入 Yelp 公司前,他曾经为创业公司和 Amazon 构建基础设施。他喜爱美食,文化,以及同妻子一道漫步在旧金山湾区的户外。

查看英文原文: Exploring Hadoop OutputFormat


感谢侯伯薇对本文的审校。

给InfoQ 中文站投稿或者参与内容翻译工作,请邮件至 editors@cn.infoq.com 。也欢迎大家通过新浪微博( @InfoQ )或者腾讯微博( @InfoQ )关注我们,并与我们的编辑和其他读者朋友交流。

2012-01-20 00:0010337
用户头像

发布了 125 篇内容, 共 43.7 次阅读, 收获喜欢 5 次。

关注

评论

发布
暂无评论
发现更多内容

霸榜巨作!阿里内部顶级大佬整理(Redis 5设计与源码分析)

钟奕礼

Java 程序员 java面试 java编程

CorelDraw2023主要功能特性

茶色酒

CorelDraw2023 CorelDraw

三到五年互联网公司Java面试题大全

钟奕礼

Java 程序员 java面试 java编程

LeetCode题解:783. 二叉搜索树节点最小距离,栈,JavaScript,详细注释

Lee Chen

JavaScript 算法 LeetCode

这20道微服务面试题,阿里、字节、美团、百度面试都问了

钟奕礼

Java 程序员 java面试 java编程

Camtasia2023免费电脑录屏视频软件使用教程

茶色酒

Camtasia Camtasia2023

网络核心笔记(一)

lxmoe

学习笔记 网络 11月月更

网络核心笔记(二)

lxmoe

学习笔记 网络 11月月更

8年Java开发含泪刷题,架构岗现在好难进,有点崩溃

钟奕礼

Java 程序员 java面试 java编程

GitHub标星75k,阿里15W字的Spring高级文档(全彩版),真的太香了

程序知音

Java spring ssm java架构 后端技术

API渗透测试4个关键步骤

阿泽🧸

11月月更 API渗透测试

如何召回流失用户

穿过生命散发芬芳

11月月更 流失召回

2022年华为Java面经,还没搞懂JVM

钟奕礼

Java 程序员 Java 面试 java编程

赞不绝口!仅靠阿里P9分享的 Redis 工作手册,拿到60W年薪Offer

程序知音

Java 数据库 redis 后端技术 Redis 6.0

有限状态机

芯动大师

Verilog 11月月更 Melay FSM

架构误区系列5:滥用分布式锁

agnostic

分布式锁

从基础到实战,阿里巴巴高并发系统设计全彩版手册限时开源

Java全栈架构师

程序员 面试 程序员人生 高并发 架构师

2022成功入职阿里:阿里的三套Java研发岗面试题总结(文末有答案)

钟奕礼

Java java面试 java编程 程序员、

CleanMyMac2023Mac系统电脑磁盘优化软件

茶色酒

CleanMyMac CleanMyMac2023

Redis分布式锁剖析和几种客户端的实现

C++后台开发

redis 分布式 后端开发 C++开发

算法题学习---链表的奇偶重排

桑榆

算法题 11月月更

精选2022年大厂高频Java面试真题集锦(含答案),面试一路开挂

程序知音

java面试 大厂面试 java架构 后端技术 Java面试八股文

仅hashmap一道面试题我就搞定了面试官成功入职面试官:我裂开了

钟奕礼

Java java面试 java编程 程序员、

阿里、百度、美团、面试题大集合,愿你更轻松拿下大厂offer

钟奕礼

Java java面试 java编程 程序员、

ABBYY FineReader16最新版PDF编辑器功能介绍

茶色酒

abbyy

FL Studio21最新版编曲DJ舞曲制作软件

茶色酒

FL Studio FL Studio 21

聊聊香港优才-续篇(58/100)

hackstoic

香港优才

Scrum Patterns:产品的自豪感(Product Pride)

Bruce Talk

Scrum 敏捷 Agile Scrum Patterns

亿级万物互联新时代的物联网消息中间件EMQX调研

宋小生

物联网 mqtt emqx

架构实战营模块 5 作业

陌生流云

架构实战营

一篇文章彻底理解 HDFS 的安全模式

明哥的IT随笔

hadoop hdfs

探索Hadoop OutputFormat_大数据_Jim.Blomo_InfoQ精选文章