揭秘 InputFormat:掌控 Map Reduce 任务执行的利器

阅读数:3122 2012 年 1 月 9 日

话题:DevOps大数据AI

随着越来越多的公司采用 Hadoop,它所处理的问题类型也变得愈发多元化。随着 Hadoop 适用场景数量的不断膨胀,控制好怎样执行以及何处执行 map 任务显得至关重要。实现这种控制的方法之一就是自定义 **InputFormat** 实现。

InputFormat 类是 Hadoop Map Reduce 框架中的基础类之一。该类主要用来定义两件事情:

  • 数据分割 (Data splits)
  • 记录读取器 (Record reader)

数据分割 是 Hadoop Map Reduce 框架中的基础概念之一,它定义了单个 Map 任务的大小及其可能的执行服务器信息。记录读取器 主要负责从输入文件实际读取数据并将它们(以键值对的形式)提交给 mapper。尽管有不少文章介绍过怎样实现自定义的 _ 记录读取器 _(例如,参考文章 [1]),但是关于如何进行分割(split)的介绍却相当粗略。这里我们将会解释什么是分割,并介绍怎样实现自定义分割来完成特定任务。

剖析分割

任何分割操作的实现都继承自 Apache 抽象基类——InputSplit,它定义了分割的长度及位置。分割长度 是指分割数据的大小(以字节为单位),而 _ 分割位置 _ 是分割所在的机器结点名称组成的列表,其中待分割的数据都会于本地存在。分割位置可以方便调度器决定在哪个机器上执行此次分割。简化后的[1]作业跟踪器 (job tracker) 工作流程如下:

  • 接受来自某个任务跟踪器 (task tracker) 的心跳通信,得到该位置 map 的可用情况。
  • 为队列等候中的分割任务找到可用的“本地”结点。
  • 向任务跟踪器提交分割请求以待执行。

数据局部性 (Locality) 的相关程度因存储机制和整体的执行策略的不同而不同。例如,在 Hadoop 分布式文件系统 (HDFS) 中,分割通常对应一个物理数据块大小以及该数据块物理定位所在的一系列机器(其中机器总数由复制因子定义)的位置。这就是 **FileInputFormat** 计算分割的过程。

而 HBase 的实现则采用了另外一套方法。在 HBase 中,分割 对应于一系列属于某个表区域 (table region) 的表键 (table keys),而 _ 位置 _ 则为正在运行区域服务器的机器。

计算密集型应用

Map Reduce 应用中有一类特殊的应用叫做计算密集型应用(Compute-Intensive application)。这类应用的特点在于 Mapper.map() 函数执行的时间要远远长于数据访问的时间,且至少要差一个数量级。从技术角度来说,虽然这类应用仍然可以使用“标准”输入格式的实现,但是它会带来数据存放结点过少而集群内剩余结点没能充分利用的问题(见图 1)。

(点击图片进行放大)

图 1:数据局部性情况下的结点使用图

图 1 中显示了针对计算密集型应用,使用“标准”数据局部性导致的结点使用率上的巨大差异——有些结点(红色标注)被过度使用,而其他结点(黄色和浅绿色标注)则使用不足。由此可见,在针对计算密集型应用时,需要重新思考对“局部性”概念的认识。在这种情况下,“局部性”意味着所有可用结点之间 map 任务的均匀分布——即最大化地使用集群机器的计算能力。

使用自定义 InputFormat 改变“局部性”

假定源数据以文件序列的形式存在,那么一个简单的 **ComputeIntensiveSequenceFileInputFormat** 类(见清单 1)便可以实现将分割生成的结果均匀地分布在集群中的所有服务器上。

复制代码
package com.navteq.fr.mapReduce.InputFormat;
<p>import java.io.IOException;<br></br>import java.util.ArrayList;<br></br>import java.util.Collection;<br></br>import java.util.List;<br></br>import java.util.StringTokenizer;</p><p>import org.apache.hadoop.fs.FileStatus;<br></br>import org.apache.hadoop.fs.Path;<br></br>import org.apache.hadoop.mapred.ClusterStatus;<br></br>import org.apache.hadoop.mapred.JobClient;<br></br>import org.apache.hadoop.mapred.JobConf;<br></br>import org.apache.hadoop.mapreduce.InputSplit;<br></br>import org.apache.hadoop.mapreduce.JobContext;<br></br>import org.apache.hadoop.mapreduce.lib.input.FileSplit;<br></br>import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;</p><p>public class ComputeIntensiveSequenceFileInputFormat<K, V> extends SequenceFileInputFormat<K, V> {</p><p>          private static final double SPLIT_SLOP = 1.1; // 10% slop<br></br>          static final String NUM_INPUT_FILES = "mapreduce.input.num.files";</p><p>          /**<br></br>          * Generate the list of files and make them into FileSplits.<br></br>          */ <br></br>          @Override<br></br>          public List<InputSplit> getSplits(JobContext job) throws IOException {<br></br>           </p><p>          long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));<br></br>          long maxSize = getMaxSplitSize(job);</p><p>          // get servers in the cluster<br></br>          String[] servers = getActiveServersList(job);<br></br>          if(servers == null)<br></br>                     return null;<br></br>          // generate splits<br></br>          List<InputSplit> splits = new ArrayList<InputSplit>();<br></br>          List<FileStatus>files = listStatus(job);<br></br>          int currentServer = 0;<br></br>          for (FileStatus file: files) {<br></br>                    Path path = file.getPath();<br></br>                    long length = file.getLen();<br></br>                    if ((length != 0) && isSplitable(job, path)) { <br></br>                              long blockSize = file.getBlockSize();<br></br>                              long splitSize = computeSplitSize(blockSize, minSize, maxSize);</p><p>                              long bytesRemaining = length;<br></br>                              while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {<br></br>                                        splits.add(new FileSplit(path, length-bytesRemaining, splitSize, <br></br>                                                                  new String[] {servers[currentServer]}));<br></br>                              currentServer = getNextServer(currentServer, servers.length);<br></br>                              bytesRemaining -= splitSize;<br></br>                    }</p><p>                    if (bytesRemaining != 0) {<br></br>                              splits.add(new FileSplit(path, length-bytesRemaining, bytesRemaining, <br></br>                                                new String[] {servers[currentServer]}));<br></br>                              currentServer = getNextServer(currentServer, servers.length);<br></br>                    }<br></br>          } else if (length != 0) {<br></br>                    splits.add(new FileSplit(path, 0, length, <br></br>                                     new String[] {servers[currentServer]}));<br></br>                    currentServer = getNextServer(currentServer, servers.length);<br></br>          } else { <br></br>                    //Create empty hosts array for zero length files<br></br>                    splits.add(new FileSplit(path, 0, length, new String[0]));<br></br>          }<br></br>      }</p><p>      // Save the number of input files in the job-conf<br></br>      job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());</p><p>      return splits;<br></br>   }</p><p>   private String[] getActiveServersList(JobContext context){</p><p>      String [] servers = null;<br></br>       try {<br></br>                JobClient jc = new JobClient((JobConf)context.getConfiguration()); <br></br>                ClusterStatus status = jc.getClusterStatus(true);<br></br>                Collection<String> atc = status.getActiveTrackerNames();<br></br>                servers = new String[atc.size()];<br></br>                int s = 0;<br></br>                for(String serverInfo : atc){<br></br>                         StringTokenizer st = new StringTokenizer(serverInfo, ":");<br></br>                         String trackerName = st.nextToken();<br></br>                         StringTokenizer st1 = new StringTokenizer(trackerName, "_");<br></br>                         st1.nextToken();<br></br>                         servers[s++] = st1.nextToken();<br></br>                }<br></br>      }catch (IOException e) {<br></br>                e.printStackTrace();<br></br>      }</p><p>      return servers;</p><p>  }</p><p>  private static int getNextServer(int current, int max){</p><p>      current++;<br></br>      if(current >= max)<br></br>                current = 0;<br></br>      return current;<br></br>  }<br></br>}</p>

清单 1:ComputeIntensiveSequenceFileInputFormat 类

该类继承自 SequenceFileInputFormat 并重写了 **getSplits()方法,虽然计算分割的过程与FileInputFormat** 完全一样,但是它为分割指定了“局部性”,以便从集群中找出可用的服务器。getSplits() 利用到了以下两个方法:

  • getActiveServersList() 方法,返回集群中当前可用的服务器(名称)组成的数组。
  • getNextServer() 方法,返回服务器数组中下一个服务器索引,且当服务器数组中元素全部用尽时,返回数组头部重新开始。

虽然上述实现(见清单 1)将 map 任务的执行均匀地分布在了集群中的所有服务器上,但是它却完全忽略了数据的实际位置。稍微好点的 **getSplits** 方法实现(见清单 2)可以试图将两种策略结合在一起:既尽量多地放置针对数据的本地作业,且保持剩余作业在集群上的良好平衡。[2]

<span color="#000000"><b>public</b> List<<span><span color="#000000">InputSplit</span></span>> getSplits(JobContext job) <b>throws</b> IOException { </span><p><span color="#000000">         // get splits<br></br>        List<<span>InputSplit</span>> originalSplits = <b>super</b>.getSplits(job);</span></p><p><span color="#000000">         // Get active servers<br></br>        String[] servers = getActiveServersList(job);<br></br>        <b>if</b>(servers == <b>null</b>)<br></br>                <b>return</b> <b>null</b>;<br></br>        // reassign splits to active servers<br></br>        List<<span>InputSplit</span>> splits = <b>new</b> ArrayList<<span>InputSplit</span>>(originalSplits.size());<br></br>        <b>int</b> numSplits = originalSplits.size();<br></br>        <b>int</b> currentServer = 0;<br></br>        <b>for</b>(<b>int</b> i = 0; i < numSplits; i++, currentServer = i>getNextServer(currentServer, <br></br>                                                                           servers.length)){<br></br>                String server = servers[currentServer]; // Current server<br></br>                <b>boolean</b> replaced = <b>false</b>;<br></br>                // For every remaining split<br></br>                <b>for</b>(<span>InputSplit</span> split : originalSplits){<br></br>                        FileSplit fs = (FileSplit)split;<br></br>                        // For every split location<br></br>                        <b>for</b>(String l : fs.getLocations()){<br></br>                                 // If this split is local to the server<br></br>                                 <b>if</b>(l.equals(server)){<br></br>                                           // Fix split location<br></br>                                           splits.add(<b>new</b> FileSplit(fs.getPath(), fs.getStart(),<br></br>                                                            fs.getLength(), <b>new</b> String[] {server}));<br></br>                                           originalSplits.remove(split);<br></br>                                           replaced = <b>true</b>;<br></br>                                           <b>break</b>;<br></br>                                 }<br></br>                        }<br></br>                        <b>if</b>(replaced)<br></br>                                 <b>break</b>;<br></br>                }<br></br>                // If no local splits are found for this server<br></br>                <b>if</b>(!replaced){<br></br>                        // Assign first available split to it <br></br>                        FileSplit fs = (FileSplit)splits.get(0);<br></br>                        splits.add(<b>new</b> FileSplit(fs.getPath(), fs.getStart(), fs.getLength(), <br></br>                                                                             <b>new</b> String[] {server}));<br></br>                        originalSplits.remove(0);<br></br>                }<br></br>        }<br></br>        <b>return</b> splits;<br></br>}</span></p>清单 2:优化过的 getSplits 方法

在此实现中,我们首先使用父类 (FileInputSplit) 来得到包含位置计算在内的分割以确保数据局部性。然后我们计算出可用的服务器列表,并为每一个存在的服务器尝试分配与其同处本地的分割。

延迟公平调度

虽然清单 1 和清单 2 中的代码都正确地计算出了分割位置,但当我们试图在 Hadoop 集群上运行代码时,就会发现结果与服务器之间产生均匀分布相去甚远。参考文章 [2] 中很好的描述了我们观察到的这个问题,并为该问题描述了一种解决方案——即延迟公平调度。

假设已经设置好了公平调度程序,那么下面的程序段应当加入到 **mapred-site.xml** 文件中以激活某个延迟调度程序[3]:

复制代码
<property>
        <name>mapred.fairscheduler.locality.delay</name>
        <value>360000000</value>
<property>

适当借助延迟公平调度程序,作业执行将可以利用整个集群(见图 2)。此外,根据我们的实验,这种情况下的执行时间相比“数据局部性”的做法要节省约 30%。

(点击图片进行放大)

图 2:执行局部性情况下的结点使用图

其他注意事项

用于测试的计算作业共使用了 96 个 split 和 mapper 任务。测试集群拥有 19 个数据结点,其中每个数据结点拥有 8 个 mapper 槽,因此该集群共有 152 个可用槽。当该作业运行时,它并没有充分利用集群中的所有槽。

Ganglia 的两份截图都展示了我们所使用的测试集群,其中前三个结点为控制结点,而第四个结点为边缘结点,主要用来启动作业。图中展示了中央处理器 / 机器的负载情况。在图 1 中,有一些结点被过度使用(红色显示),而集群中的其他结点则未得到充分利用。在图 2 中,虽然我们得到了更加平衡的分布,然而集群仍然未被充分利用。用于测试的作业也可以运行多线程,这么做会增加中央处理器的负载,但同时也会降低在每次 **Mapper.map()** 迭代上的整体时间花费。正如图 3 所示,通过增加线程数量,我们可以更好地利用集群资源,并进一步减少完成作业所花费的时间。通过改变作业区域性,我们可以在不牺牲性能的情况下更好地利用群集处理远程作业数据。

(点击图片进行放大)

图 3:使用多线程 Map 作业的执行区域性情况下的结点使用图

即使机器中央处理器处于高负荷状态,它仍然可以允许其他磁盘 I/O 密集型作业运行在开放槽中,要注意的是,这么做会带来些许的性能下降。

自定义分割

本文中提到的方法对大文件非常适用,但是对于小文件而言,并没有足够的分割来让其使用集群中的多台机器。一种可行的方法是使用更小的分割块,但是这么做会给集群命名结点带来更多的负担(内存需求方面)。一种更好的做法是修改清单 1 中的代码,以使用自定义的块大小(而不是文件块大小)。这种方法可以计算出所需的分割块数量,而不用理会实际的文件大小。

总结

在这篇文章中,我们已经展示了如何利用自定义的 **InputFormat**s 来更紧密地控制 Map Reduce 中的 map 任务在可用服务器间的分布。这种控制对于一类特殊应用——计算密集型应用非常重要,控制过程将 Hadoop Map Reduce 做为通用的并行执行框架使用。

关于作者

Boris Lublinsky是 NAVTEQ 公司的首席架构师,在这家公司中他的工作是为大型数据管理、处理以及 SOA 定义架构愿景,并且实施各种 NAVTEQ 的项目。他还是 InfoQ 的 SOA 编辑,OASIS 的 SOA RA 工作组的参与者。Boris 是一位作者并经常发表演讲,他最新的一本书叫做《Applied SOA》。

Michael Segel在过去二十多年里一直不断与客户合作,帮助他们发现并解决业务上的问题。Michael 做过许多不同类型的工作,也在不同的行业圈摸打滚爬过。他是一位独立顾问,并且总是期望能够解决所有具有挑战性的问题。Michael 拥有俄亥俄州立大学的软件工程学位。

参考

1. Boris Lublinsky, Mike Segel. Custom XML Records Reader.

2. Matei Zaharia, Dhruba Borthakur, Joydeep Sen Sarma, Khaled Elmeleegy, Scott Shenker, Ion Stoica. Delay Scheduling: A Simple Technique for Achieving Locality and Fairness in Cluster Scheduling.


[1] 这是一个简化后的解释。真实的调度算法要复杂得多;考虑更多的参数而不仅仅是分割的位置。

[2] 虽然我们将其列作一个选项,但是如果花费在 Mapper.map() 方法上的时间高与远程访问数据时间的一个或多个数量级时,清单 1 中的代码将不会有任何性能上的提升。但尽管如此,它也许会带来网络利用率上的略微提高。

[3] 请注意这里的延迟以毫秒为单位,在改变该值之后需要重新启动作业跟踪器。

查看英文原文:Uncovering mysteries of InputFormat: Providing better control for your Map Reduce execution.