写点什么

HDFS Federation

2019 年 9 月 27 日

HDFS Federation

1 背景

当前 Hadoop 集群版本是 2.7.3,社区最新 Hadoop 生产版本是 Hadoop3.2.0(rbf1.0),随着集群规模及数据量的增加,单 NameNode 产生了性能及稳定性瓶颈:


  • 堆内存:NameNode 堆内存限制,目前文件目录 3.15 亿,块 3.05 亿,(namespace 占 64G,blockmap 占 54G,约 118G),namenode 堆内存设置 180G

  • 性能:越来越高的读写并发,NameNode 的粗粒度元数据锁,RPC QPS 成为瓶颈,集群能力受限单一 NameNode 限制,经过服务端口业务端口分开,callqueue 等优化,namenode 响应,Balance 和 Mover 速度较慢

  • 除此之外,重启 Namenode 时间较长(小时级)也给集群运维工作带来不便

  • 解决方案 federation+viewfs+fastcopy


2 大数据架构


3 基本概念

Federation

为了 namespace 横向扩展,federation 使用多个各自独立的 Namenodes/namespaces.。这些 Namenodes 组成联邦组织,每个 namenode 之间不需要交流.。所有的 namenode 共享所有的 datanode 作为存储,每一个 datanode 都在集群中所有的 namenode 注册自己的信息.,而且 Datanodes 需要阶段性地向所有的 namenode 发送心跳信息和块信息报告,处理来自 namenode 的命令。


多 namenode 拥有一个集群的编号 clusterid,如果某个 namenode 格式化时,不是使用相同的 clusterid 说明处于不同的集群。Block pool(块池)就是属于单个命名空间的一组 block(块)。Datanode 是一个物理概念,而 block pool 是一个重新将 block 划分的逻辑概念,同一个 datanode 中可以存着属于多个 block pool 的多个块。



Viewfs


视图文件系统(View File System ,ViewFs)提供了管理多个 Hadoop 文件系统命名空间的方式,该系统在 HDFS federation 的集群中有多个 NameNode(因此有多个命名空间)是特别有用。


mounttable

目录挂载配置表,我们做到 hdfs 目录和 viewfs 目录一致。



Fastcopy

原理:


  • 查询待拷贝文件的所有 block 块信息。

  • 获取这些源文件块信息所在的位置信息。

  • 对于源文件的每个块,在 NameNode 内部对应创建一个空的目标块,这些目标块的存储位置尽可能与源块一致。

  • 然后命令 DataNode 做一个本地的块拷贝操作(硬链接)。

  • 然后等待块拷贝完成操作,然后上报到 NameNode 上。


二次开发:由于HDFS-2139是单客户端执行方式,相比distcp有数十倍的性能提升,但对于拷贝上亿文件快,几P的数据速度仍然无法满足需求,将fastcp改为分布式模式执行。
hadoop distfastcp -strategy dynamic -prbugpcaxt -update -skipcrccheck -i -delete hdfs://ip:9000/$1 hdfs://ip:9000/$1
我们将distcp数据拷贝逻辑改造为fastcp逻辑,将distcp中的CopyMapper中的copyFileWithRetry改为调用fastcopy FastCopy.CopyResult c = fcp.copy(sourceFileStatus.toString(), target.toString(), (DistributedFileSystem) srcFileSys,(DistributedFileSystem) targetFS);
解决map数据倾斜(利用dynamic strategy,同时将UniformSizeInputFormat的getSplits按照文件个数划分)解决文件更新覆盖删除等问题扩展acl属性及存储策略的拷贝 BlockStoragePolicy[] policies = ((DistributedFileSystem)fileSystem).getStoragePolicies(); HdfsFileStatus status = ((DistributedFileSystem)fileSystem).getClient().getFileInfo(fileStatus.getPath().makeQualified(fileSystem.getUri(), fileSystem.getWorkingDirectory()).toUri().getPath()); byte storagePolicyId = status.getStoragePolicy(); for (BlockStoragePolicy p : policies) { if (p.getId() == storagePolicyId) { copyListingFileStatus.setStoragePolicy(p.getName()); } } if(srcFileStatus.getStoragePolicy()!=null) { ((DistributedFileSystem) targetFS).setStoragePolicy(path, srcFileStatus.getStoragePolicy()); }
多线程列表生成器,如果分区文件较大,单线程列表生成器成为瓶颈导致任务启动慢 if (explore) { CountDownLatch countDownLatch = new CountDownLatch(sourceFiles.length); ExecutorService listFileExecutor= Executors.newFixedThreadPool(100); for (FileStatus sourceStatus : sourceFiles) { listFileExecutor.execute(new ListFile(countDownLatch,sourceFS,sourceStatus,sourcePathRoot,options,preserveAcls,preserveXAttrs,preserveRawXAttrs)); } countDownLatch.await(); listFileExecutor.shutdown(); }
复制代码


以下为 fastcp 和 distcp 的效率对比:



4Balance&Mover

balancer 是当 hdfs 集群中一些 datanodes 的存储要写满了或者有空白的新节点加入集群时,用于均衡 hdfs 集群磁盘使用量的一个工具 mover 这个工具用户归档数据,它类似于 Balancer(移动数据方面)。


MOVER 定期扫描 HDFS 文件,检查文件的存放是否符合它自身的存储策略。如果数据块不符合自己的策略,它会把数据移动到该去的地方。fastcp 是通过硬链接的方式来加速数据的拷贝,但同时会占用磁盘的大小,fastcp 前需要保证 datanode 各磁盘有充足的磁盘空间,避免由于空间不足带来的拷贝降级为物理数据拷贝。


由于我们使用了基于 ZFS 的透明压缩方案,即 datanode 设置一定比例的 ZFS 盘,通过 hdfs 的冷热数据分层存储来达到对用户透明的压缩方案。mover 进程的长时间运行使一些 zfs 磁盘空间早已达到 98%,经过一段时间的实验单纯通过优化 balance 命令无法降低这些 zfs 磁盘的空间使用率,因为 mover 程序优先选择在同一 datanode 内移动数据,当 balance 程序释放一定的磁盘空间后,mover 程序又会将磁盘空间打满,最后我们通过限定 mover 程序 datanode 范围的方式来降低 zfs 磁盘的空间使用率,同时 balance 程序区分不同的存储策略来 balance 数据,当高磁盘空间占用小于 85%后我们再进行 fastcp。


针对不同存储策略的 balance:


    for (DatanodeStorageReport r : reports) {          final DDatanode dn = dispatcher.newDatanode(r.getDatanodeInfo());          LOG.info("DFSConfigKeys.DFS_BALANCE_ONLY_DISK :"+ typeStore );          if (this.typeStore) {              for (StorageType t : StorageType.getMovableTypes()) {                  if ("DISK".equals(t.name())) {                      ...                      dispatcher.getStorageGroupMap().put(g);                  }              }          } else {                  for (StorageType t : StorageType.getMovableTypes()) {                        ...                 }          }

mover限定磁盘占用率高server:
static HashSet<String> getNameNodePathsToMove(Configuration conf, String... args) throws Exception { final Options opts = buildCliOptions(); CommandLineParser parser = new GnuParser(); CommandLine commandLine = parser.parse(opts, args, true); return getNameNodePaths(commandLine, conf); } private static HashSet<String> getNameNodePaths(CommandLine line, Configuration conf) throws Exception {
HashSet hashSet = new HashSet(); BufferedReader reader = new BufferedReader( new InputStreamReader(new FileInputStream(line.getOptionValue("d")), "UTF-8")); try { String line; while ((line = reader.readLine()) != null) { if (!line.trim().isEmpty()) { hashSet.add(line1); } } } finally { IOUtils.cleanup(LOG, reader); } return hashSet; }
复制代码




5 测试

**功能**



**兼容性**



6 相关 Patch

https://issues.apache.org/jira/browse/HADOOP-12253


https://issues.apache.org/jira/browse/HIVE-11364


(hive 配置, hive.exec.stagingdir)


https://issues.apache.org/jira/browse/HIVE-10790


(orc 表空目录读取,新分区写入,spark 依赖此包)


https://issues.apache.org/jira/browse/HIVE-11920


(add jar failing with url schemes)


https://issues.apache.org/jira/browse/HDFS-2139


(fastcopy,需更新 datanode 代码 hardLink)


7 数据划分方式

做好合理的数据划分方式是解决问题的关键,主要由两种方式:


  • 根据 hdfs 目录划分

  • 根据表的分区划分


根据 hdfs 目录划分,可以对 namenode 流量进行分流,做到业务数据分割,但需要对客户端有极强的掌控能力,拷贝数据时需要较长的时间,集群停服务时间达数小时以上。根据表的分区划分,可以将历史冷数据拷贝至单独的 namenode,但会导致 mounttabl 异常复杂,历史数据一旦变更会有较大的风险。


结合数据特点和业务需求及 namenode 的瓶颈,我们决定将 stg 层数据迁移至新的 namenode,目前 stg 层数据主要有三种接入方式,flume,sqoop,databus(贝壳找房自研数据流转工具),我们对这三个工具有较强的掌控力。迁移时再根据分区进行分批迁移,来解决集群长时间停服务对实时任务及 etl 任务的影响,做到平稳迁移。同时我们也把 yarn.log-aggregation 目录迁移至新的 namenode。


8 域名格式兼容性

通过运营手段解决 viewfs 兼容性问题:


  • hdfs:///user

  • hive 创建 temproty 表

  • load data inpath(sqoop)

  • flume 日志收集文件无法正常关闭

  • hivemeta 建表兼容性(迁移数据过度时期)


9Viewfs 上线步骤

  • 根据 hdfs-audit 确定读写 stg 数据客户端,并进行客户端 viewfs 升级(hadoop,hive,tez,spark,flume,sqoop)

  • 解决客户端程序 viewfs 兼容性问题

  • 修改 flume 导数据配置将 tmp 目录修改至新的 namenode 目录

  • stg 层历史数据分阶段 mv 至新目录进行 fastcp 同步数据

  • 修改 hive 对应表的元数据信息


10 总结 &展望

federation 一期我们主要迁移 stg 层数据,可降低 namenode 内存 30%,后续随着数据的增加我们再进行不同业务间的数据迁移 viewfs 需要对 hadoop 客户端有一定的掌控能力,我们也会对 RBF 进行调研引入,屏蔽 mounttable 的修改对客户端的影响,RouterFederation 每个 NameNode 作为一个子集群挂载到 RF 层,挂载信息保存在 Zookeeper 中。


RF 可以具有多个 DfsRouter 进程,负责接收用户的请求,并从 Zookeeper 中获取 Mount 信息。同时,每个 NameNode 会定时向 RF 发送心跳信息,RF 会感知 NameNode 的存活以及在 HA 状态下的 Active 状态。



作者介绍:


数大大(企业代号名),贝壳找房大数据工程师,大数据架构团队成员,目前主要负责 hadoop,hive 集群。


本文转载自公众号贝壳产品技术(ID:gh_9afeb423f390)。


原文链接:


https://mp.weixin.qq.com/s/ke5IBpHERWWwfxvsblMqOQ


2019 年 9 月 27 日 11:17494

评论

发布
暂无评论
发现更多内容

聊聊Java中的Thread类

geekymv

线程 Java25周年 Thread Runnable

架构师训练营第一课

于成

week01 UML 学习总结

李锦

第一周总结

LEAF

老当益壮的 Servlet

侯树成

Java Java 25 周年 Servlet

低调的网易又要上市了

池建强

创业 网易 慢公司

架构师训练营第一周(总结)

任鉴非

Week1命题作业

星河寒水

部署图 时序图 组件图 用例图

第1周 - 学习总结

大海

架构师训练营 No.1 周作业

连增申

架构师训练营第一周总结

Kiroro

架构师训练营-作业-第一讲

吕浩

极客大学架构师训练营

食堂就餐卡系统设计

Kiroro

第二章.软件架构设计

西柚

架构师训练营作业一:食堂就餐卡系统设计

sunnywhy

架构师作业一:食堂就餐卡系统设计

李锦

架构师训练营第1周_学习总结

chinsun1

架构总结

学习总结--Week1

吴炳华

极客大学架构师训练营

Hello World!

东哥

极客大学架构师训练营

食堂就餐卡系统设计

慢慢来的比较快

孤狼王兴 | 互联网大佬往事

刘燕

AI 企业管理 美团

食堂就餐卡系统架构设计

dj_cd

极客大学架构师训练营

架构师训练营-学习总结-第一周

ashuai1106

学习 架构师 极客大学架构师训练营

架构建模总结

任鉴非

【架构】— 一个简单系统的UML模型

不二架构

极客大学架构师训练营 UML 架构总结

UML作业

王志祥

极客大学架构师训练营

为什么建立自己的规则很重要

Neco.W

自我管理 行动派 执行力

一篇文章快速搞懂 Atomic(原子整数/原子引用/原子数组/LongAdder)

学习Java的小姐姐

Java 并发编程 并发 synchronized Atomic

架构师训练营Week1总结

sunnywhy

食堂就餐卡系统设计

Young

食堂就餐卡系统设计

LEAF

2021年全国大学生计算机系统能力大赛操作系统设计赛 技术报告会

2021年全国大学生计算机系统能力大赛操作系统设计赛 技术报告会

HDFS Federation-InfoQ