HDFS Federation

2019 年 9 月 27 日

HDFS Federation

1 背景

当前 Hadoop 集群版本是 2.7.3, 社区最新 Hadoop 生产版本是 Hadoop3.2.0(rbf1.0), 随着集群规模及数据量的增加,单 NameNode 产生了性能及稳定性瓶颈:

  • 堆内存:NameNode 堆内存限制,目前文件目录 3.15 亿,块 3.05 亿,(namespace 占 64G,blockmap 占 54G,约 118G),namenode 堆内存设置 180G

  • 性能:越来越高的读写并发,NameNode 的粗粒度元数据锁,RPC QPS 成为瓶颈,集群能力受限单一 NameNode 限制,经过服务端口业务端口分开,callqueue 等优化,namenode 响应,Balance 和 Mover 速度较慢

  • 除此之外,重启 Namenode 时间较长 (小时级) 也给集群运维工作带来不便

  • 解决方案 federation+viewfs+fastcopy

2 大数据架构

3 基本概念

Federation

为了 namespace 横向扩展,federation 使用多个各自独立的 Namenodes/namespaces.。这些 Namenodes 组成联邦组织,每个 namenode 之间不需要交流.。所有的 namenode 共享所有的 datanode 作为存储,每一个 datanode 都在集群中所有的 namenode 注册自己的信息.,而且 Datanodes 需要阶段性地向所有的 namenode 发送心跳信息和块信息报告,处理来自 namenode 的命令。

多 namenode 拥有一个集群的编号 clusterid,如果某个 namenode 格式化时,不是使用相同的 clusterid 说明处于不同的集群。Block pool(块池) 就是属于单个命名空间的一组 block(块)。Datanode 是一个物理概念,而 block pool 是一个重新将 block 划分的逻辑概念,同一个 datanode 中可以存着属于多个 block pool 的多个块。

Viewfs

视图文件系统(View File System ,ViewFs)提供了管理多个 Hadoop 文件系统命名空间的方式,该系统在 HDFS federation 的集群中有多个 NameNode(因此有多个命名空间)是特别有用。

mounttable

目录挂载配置表, 我们做到 hdfs 目录和 viewfs 目录一致。

Fastcopy

原理:

  • 查询待拷贝文件的所有 block 块信息。
  • 获取这些源文件块信息所在的位置信息。
  • 对于源文件的每个块,在 NameNode 内部对应创建一个空的目标块,这些目标块的存储位置尽可能与源块一致。
  • 然后命令 DataNode 做一个本地的块拷贝操作 (硬链接)。
  • 然后等待块拷贝完成操作,然后上报到 NameNode 上。
复制代码
二次开发:
由于 HDFS-2139 是单客户端执行方式,相比 distcp 有数十倍的性能提升,但对于拷贝上亿文件快,几 P 的数据速度仍然无法满足需求,将 fastcp 改为分布式模式执行。
hadoop distfastcp -strategy dynamic -prbugpcaxt -update -skipcrccheck -i -delete
hdfs://ip:9000/$1 hdfs://ip:9000/$1
我们将 distcp 数据拷贝逻辑改造为 fastcp 逻辑,将 distcp 中的 CopyMapper 中的 copyFileWithRetry 改为调用 fastcopy
FastCopy.CopyResult c = fcp.copy(sourceFileStatus.toString(), target.toString(),
(DistributedFileSystem) srcFileSys,(DistributedFileSystem) targetFS);
解决 map 数据倾斜(利用 dynamic strategy, 同时将 UniformSizeInputFormat 的 getSplits 按照文件个数划分)
解决文件更新覆盖删除等问题
扩展 acl 属性及存储策略的拷贝
BlockStoragePolicy[] policies = ((DistributedFileSystem)fileSystem).getStoragePolicies();
HdfsFileStatus status = ((DistributedFileSystem)fileSystem).getClient().getFileInfo(fileStatus.getPath().makeQualified(fileSystem.getUri(), fileSystem.getWorkingDirectory()).toUri().getPath());
byte storagePolicyId = status.getStoragePolicy();
for (BlockStoragePolicy p : policies) {
if (p.getId() == storagePolicyId) {
copyListingFileStatus.setStoragePolicy(p.getName());
}
}
if(srcFileStatus.getStoragePolicy()!=null) {
((DistributedFileSystem) targetFS).setStoragePolicy(path, srcFileStatus.getStoragePolicy());
}
多线程列表生成器,如果分区文件较大,单线程列表生成器成为瓶颈导致任务启动慢
if (explore) {
CountDownLatch countDownLatch = new CountDownLatch(sourceFiles.length);
ExecutorService listFileExecutor= Executors.newFixedThreadPool(100);
for (FileStatus sourceStatus : sourceFiles) {
listFileExecutor.execute(new ListFile(countDownLatch,sourceFS,sourceStatus,sourcePathRoot,options,preserveAcls,preserveXAttrs,preserveRawXAttrs));
}
countDownLatch.await();
listFileExecutor.shutdown();
}

以下为 fastcp 和 distcp 的效率对比:

4Balance&Mover

balancer 是当 hdfs 集群中一些 datanodes 的存储要写满了或者有空白的新节点加入集群时,用于均衡 hdfs 集群磁盘使用量的一个工具 mover 这个工具用户归档数据,它类似于 Balancer(移动数据方面)。

MOVER 定期扫描 HDFS 文件,检查文件的存放是否符合它自身的存储策略。如果数据块不符合自己的策略,它会把数据移动到该去的地方。fastcp 是通过硬链接的方式来加速数据的拷贝,但同时会占用磁盘的大小,fastcp 前需要保证 datanode 各磁盘有充足的磁盘空间,避免由于空间不足带来的拷贝降级为物理数据拷贝。

由于我们使用了基于 ZFS 的透明压缩方案,即 datanode 设置一定比例的 ZFS 盘,通过 hdfs 的冷热数据分层存储来达到对用户透明的压缩方案。mover 进程的长时间运行使一些 zfs 磁盘空间早已达到 98%,经过一段时间的实验单纯通过优化 balance 命令无法降低这些 zfs 磁盘的空间使用率,因为 mover 程序优先选择在同一 datanode 内移动数据,当 balance 程序释放一定的磁盘空间后,mover 程序又会将磁盘空间打满,最后我们通过限定 mover 程序 datanode 范围的方式来降低 zfs 磁盘的空间使用率,同时 balance 程序区分不同的存储策略来 balance 数据,当高磁盘空间占用小于 85% 后我们再进行 fastcp。

针对不同存储策略的 balance:

复制代码
for (DatanodeStorageReport r : reports) {
final DDatanode dn = dispatcher.newDatanode(r.getDatanodeInfo());
LOG.info("DFSConfigKeys.DFS_BALANCE_ONLY_DISK :"+ typeStore );
if (this.typeStore) {
for (StorageType t : StorageType.getMovableTypes()) {
if ("DISK".equals(t.name())) {
...
dispatcher.getStorageGroupMap().put(g);
}
}
} else {
for (StorageType t : StorageType.getMovableTypes()) {
...
}
}
mover 限定磁盘占用率高 server:
static HashSet<String> getNameNodePathsToMove(Configuration conf,
String... args) throws Exception {
final Options opts = buildCliOptions();
CommandLineParser parser = new GnuParser();
CommandLine commandLine = parser.parse(opts, args, true);
return getNameNodePaths(commandLine, conf);
}
private static HashSet<String> getNameNodePaths(CommandLine line,
Configuration conf) throws Exception {
HashSet hashSet = new HashSet();
BufferedReader reader = new BufferedReader(
new InputStreamReader(new FileInputStream(line.getOptionValue("d")), "UTF-8"));
try {
String line;
while ((line = reader.readLine()) != null) {
if (!line.trim().isEmpty()) {
hashSet.add(line1);
}
}
} finally {
IOUtils.cleanup(LOG, reader);
}
return hashSet;
}


5 测试

** 功能 **

** 兼容性 **

6 相关 Patch

https://issues.apache.org/jira/browse/HADOOP-12253

https://issues.apache.org/jira/browse/HIVE-11364

(hive 配置, hive.exec.stagingdir)

https://issues.apache.org/jira/browse/HIVE-10790

(orc 表空目录读取,新分区写入,spark 依赖此包)

https://issues.apache.org/jira/browse/HIVE-11920

(add jar failing with url schemes)

https://issues.apache.org/jira/browse/HDFS-2139

(fastcopy,需更新 datanode 代码 hardLink)

7 数据划分方式

做好合理的数据划分方式是解决问题的关键,主要由两种方式:

  • 根据 hdfs 目录划分
  • 根据表的分区划分

根据 hdfs 目录划分,可以对 namenode 流量进行分流,做到业务数据分割,但需要对客户端有极强的掌控能力,拷贝数据时需要较长的时间,集群停服务时间达数小时以上。根据表的分区划分,可以将历史冷数据拷贝至单独的 namenode,但会导致 mounttabl 异常复杂,历史数据一旦变更会有较大的风险。

结合数据特点和业务需求及 namenode 的瓶颈,我们决定将 stg 层数据迁移至新的 namenode,目前 stg 层数据主要有三种接入方式,flume,sqoop,databus(贝壳找房自研数据流转工具),我们对这三个工具有较强的掌控力。迁移时再根据分区进行分批迁移,来解决集群长时间停服务对实时任务及 etl 任务的影响,做到平稳迁移。同时我们也把 yarn.log-aggregation 目录迁移至新的 namenode。

8 域名格式兼容性

通过运营手段解决 viewfs 兼容性问题:

  • hdfs:///user
  • hive 创建 temproty 表
  • load data inpath(sqoop)
  • flume 日志收集文件无法正常关闭
  • hivemeta 建表兼容性 (迁移数据过度时期)

9Viewfs 上线步骤

  • 根据 hdfs-audit 确定读写 stg 数据客户端,并进行客户端 viewfs 升级 (hadoop,hive,tez,spark,flume,sqoop)
  • 解决客户端程序 viewfs 兼容性问题
  • 修改 flume 导数据配置将 tmp 目录修改至新的 namenode 目录
  • stg 层历史数据分阶段 mv 至新目录进行 fastcp 同步数据
  • 修改 hive 对应表的元数据信息

10 总结 & 展望

federation 一期我们主要迁移 stg 层数据,可降低 namenode 内存 30%,后续随着数据的增加我们再进行不同业务间的数据迁移 viewfs 需要对 hadoop 客户端有一定的掌控能力,我们也会对 RBF 进行调研引入,屏蔽 mounttable 的修改对客户端的影响,RouterFederation 每个 NameNode 作为一个子集群挂载到 RF 层,挂载信息保存在 Zookeeper 中。

RF 可以具有多个 DfsRouter 进程,负责接收用户的请求,并从 Zookeeper 中获取 Mount 信息。同时,每个 NameNode 会定时向 RF 发送心跳信息,RF 会感知 NameNode 的存活以及在 HA 状态下的 Active 状态。

作者介绍:
数大大(企业代号名),贝壳找房大数据工程师,大数据架构团队成员,目前主要负责 hadoop,hive 集群。

本文转载自公众号贝壳产品技术(ID:gh_9afeb423f390)。

原文链接:

https://mp.weixin.qq.com/s/ke5IBpHERWWwfxvsblMqOQ

2019 年 9 月 27 日 11:17 316

评论

发布
暂无评论
发现更多内容

架构0期Week4作业2

Nan Jiang

Linux 操作系统!开篇!!!

cxuan

Linux

Google官方MVP+Dagger2架构详解

小吴选手

架构 架构师 架构是训练营

「NIO系列」——之Reactor模型

小谈

Spring Boot reactor 后端 nio SpringCloud

如果是你,年薪80万和阿里P7月薪36K,会怎么选?

犬来八荒

Java 腾讯 面试 阿里 java面试

基于 Flagger 和 Nginx-Ingress 实现金丝雀发布

郭旭东

Kubernetes CI/CD

极客大学架构师训练营 系统架构 分布式缓存 一致性哈希 Hash 第9课 听课总结

John(易筋)

极客时间 极客大学 极客大学架构师训练营 分布式缓存 一致性哈希

Redis 开创者 Salvatore Sanfilippo 退出 Redis 的代码日常维护工作

lmymirror

技术人 Redis作者

信创舆情一线--英特尔暂停向浪潮供货

统小信uos

服务器 舆情 芯片

去面试Spring Cloud 被问的35个问题

小谈

Java 面试 springboot SpringCloud buffer JVM原理

MyBatis入门

Simon郎

Java mybatis

架构师训练营第五周总结

Melo

极客大学架构师训练营

如何快速将 Linux 系统制作成 ISO 镜像文件?

JackTian

Linux 运维 操作系统 镜像文件 ISO

面试腾讯被问JVM性能调优,勉强入职后,发现工资差了这么多

互联网架构师小马

Java 程序员 面试 性能优化 JVM

谈谈容器和K8s

Gabriel

Apache Beam学习笔记

BlueblueWings

产业数字化无法“一蹴而就”,而是“长跑冠军”。

CECBC区块链专委会

模式与重构-作业

秤须苑

当国产iVX遇上新晋产品PowerPlatform,能否披荆斩棘、稳住阵脚?

代码制造者

程序员 编辑器 低代码 快速开发 开发工具

拥抱开源开放,易观技术开发者的星海征途

易观大数据

海豚调度 调度引擎

腾讯的辣酱不香了 支付宝的区块链真能解决“萝卜章”问题?

CECBC区块链专委会

双链通 萝卜章 区块链方案

ARTS-week5

王钰淇

ARTS 打卡计划

再有人问你分布式事务,把这篇扔给他

码哥小胖

分布式 Java 分布式

架构0期Week4作业1

Nan Jiang

最详细的 Spring Cloud OAuth2 单点登录使用教程送给大家

小闫

后端 JVM Java 面试 SpringCloud

锦囊篇|一文摸懂SharedPreferences和MMKV(一)

ClericYi

起底印度禁用59款应用的数据表现

谢锐 | Frozen

移动应用 游戏开发 游戏出海 移动互联网 游戏制作

终于有大佬把TCP/IP协议讲清楚了!面试再也不怂面试官提问了

小闫

jdk JVM Netty buffer TCP/IP

【自学成才系列一】multipass安装篇

小朱

multipass

Java面试常用知识(附赠最新面试题)

架构大数据双料架构师

写给孩子的两本书我读得津津有味

孙苏勇

读书 陪伴 随笔杂谈

HDFS Federation-InfoQ