HDFS Federation

阅读数:80 2019 年 9 月 27 日 11:17

HDFS Federation

1 背景

当前 Hadoop 集群版本是 2.7.3, 社区最新 Hadoop 生产版本是 Hadoop3.2.0(rbf1.0), 随着集群规模及数据量的增加,单 NameNode 产生了性能及稳定性瓶颈:

  • 堆内存:NameNode 堆内存限制,目前文件目录 3.15 亿,块 3.05 亿,(namespace 占 64G,blockmap 占 54G,约 118G),namenode 堆内存设置 180G

  • 性能:越来越高的读写并发,NameNode 的粗粒度元数据锁,RPC QPS 成为瓶颈,集群能力受限单一 NameNode 限制,经过服务端口业务端口分开,callqueue 等优化,namenode 响应,Balance 和 Mover 速度较慢

  • 除此之外,重启 Namenode 时间较长 (小时级) 也给集群运维工作带来不便

  • 解决方案 federation+viewfs+fastcopy

2 大数据架构

HDFS Federation

3 基本概念

Federation

为了 namespace 横向扩展,federation 使用多个各自独立的 Namenodes/namespaces.。这些 Namenodes 组成联邦组织,每个 namenode 之间不需要交流.。所有的 namenode 共享所有的 datanode 作为存储,每一个 datanode 都在集群中所有的 namenode 注册自己的信息.,而且 Datanodes 需要阶段性地向所有的 namenode 发送心跳信息和块信息报告,处理来自 namenode 的命令。

多 namenode 拥有一个集群的编号 clusterid,如果某个 namenode 格式化时,不是使用相同的 clusterid 说明处于不同的集群。Block pool(块池) 就是属于单个命名空间的一组 block(块)。Datanode 是一个物理概念,而 block pool 是一个重新将 block 划分的逻辑概念,同一个 datanode 中可以存着属于多个 block pool 的多个块。

HDFS Federation

Viewfs

视图文件系统(View File System ,ViewFs)提供了管理多个 Hadoop 文件系统命名空间的方式,该系统在 HDFS federation 的集群中有多个 NameNode(因此有多个命名空间)是特别有用。

mounttable

目录挂载配置表, 我们做到 hdfs 目录和 viewfs 目录一致。

HDFS Federation

Fastcopy

原理:

  • 查询待拷贝文件的所有 block 块信息。
  • 获取这些源文件块信息所在的位置信息。
  • 对于源文件的每个块,在 NameNode 内部对应创建一个空的目标块,这些目标块的存储位置尽可能与源块一致。
  • 然后命令 DataNode 做一个本地的块拷贝操作 (硬链接)。
  • 然后等待块拷贝完成操作,然后上报到 NameNode 上。
复制代码
二次开发:
由于 HDFS-2139 是单客户端执行方式,相比 distcp 有数十倍的性能提升,但对于拷贝上亿文件快,几 P 的数据速度仍然无法满足需求,将 fastcp 改为分布式模式执行。
hadoop distfastcp -strategy dynamic -prbugpcaxt -update -skipcrccheck -i -delete
hdfs://ip:9000/$1 hdfs://ip:9000/$1
我们将 distcp 数据拷贝逻辑改造为 fastcp 逻辑,将 distcp 中的 CopyMapper 中的 copyFileWithRetry 改为调用 fastcopy
FastCopy.CopyResult c = fcp.copy(sourceFileStatus.toString(), target.toString(),
(DistributedFileSystem) srcFileSys,(DistributedFileSystem) targetFS);
解决 map 数据倾斜(利用 dynamic strategy, 同时将 UniformSizeInputFormat 的 getSplits 按照文件个数划分)
解决文件更新覆盖删除等问题
扩展 acl 属性及存储策略的拷贝
BlockStoragePolicy[] policies = ((DistributedFileSystem)fileSystem).getStoragePolicies();
HdfsFileStatus status = ((DistributedFileSystem)fileSystem).getClient().getFileInfo(fileStatus.getPath().makeQualified(fileSystem.getUri(), fileSystem.getWorkingDirectory()).toUri().getPath());
byte storagePolicyId = status.getStoragePolicy();
for (BlockStoragePolicy p : policies) {
if (p.getId() == storagePolicyId) {
copyListingFileStatus.setStoragePolicy(p.getName());
}
}
if(srcFileStatus.getStoragePolicy()!=null) {
((DistributedFileSystem) targetFS).setStoragePolicy(path, srcFileStatus.getStoragePolicy());
}
多线程列表生成器,如果分区文件较大,单线程列表生成器成为瓶颈导致任务启动慢
if (explore) {
CountDownLatch countDownLatch = new CountDownLatch(sourceFiles.length);
ExecutorService listFileExecutor= Executors.newFixedThreadPool(100);
for (FileStatus sourceStatus : sourceFiles) {
listFileExecutor.execute(new ListFile(countDownLatch,sourceFS,sourceStatus,sourcePathRoot,options,preserveAcls,preserveXAttrs,preserveRawXAttrs));
}
countDownLatch.await();
listFileExecutor.shutdown();
}

以下为 fastcp 和 distcp 的效率对比:

HDFS Federation

4Balance&Mover

balancer 是当 hdfs 集群中一些 datanodes 的存储要写满了或者有空白的新节点加入集群时,用于均衡 hdfs 集群磁盘使用量的一个工具 mover 这个工具用户归档数据,它类似于 Balancer(移动数据方面)。

MOVER 定期扫描 HDFS 文件,检查文件的存放是否符合它自身的存储策略。如果数据块不符合自己的策略,它会把数据移动到该去的地方。fastcp 是通过硬链接的方式来加速数据的拷贝,但同时会占用磁盘的大小,fastcp 前需要保证 datanode 各磁盘有充足的磁盘空间,避免由于空间不足带来的拷贝降级为物理数据拷贝。

由于我们使用了基于 ZFS 的透明压缩方案,即 datanode 设置一定比例的 ZFS 盘,通过 hdfs 的冷热数据分层存储来达到对用户透明的压缩方案。mover 进程的长时间运行使一些 zfs 磁盘空间早已达到 98%,经过一段时间的实验单纯通过优化 balance 命令无法降低这些 zfs 磁盘的空间使用率,因为 mover 程序优先选择在同一 datanode 内移动数据,当 balance 程序释放一定的磁盘空间后,mover 程序又会将磁盘空间打满,最后我们通过限定 mover 程序 datanode 范围的方式来降低 zfs 磁盘的空间使用率,同时 balance 程序区分不同的存储策略来 balance 数据,当高磁盘空间占用小于 85% 后我们再进行 fastcp。

针对不同存储策略的 balance:

复制代码
for (DatanodeStorageReport r : reports) {
final DDatanode dn = dispatcher.newDatanode(r.getDatanodeInfo());
LOG.info("DFSConfigKeys.DFS_BALANCE_ONLY_DISK :"+ typeStore );
if (this.typeStore) {
for (StorageType t : StorageType.getMovableTypes()) {
if ("DISK".equals(t.name())) {
...
dispatcher.getStorageGroupMap().put(g);
}
}
} else {
for (StorageType t : StorageType.getMovableTypes()) {
...
}
}
mover 限定磁盘占用率高 server:
static HashSet<String> getNameNodePathsToMove(Configuration conf,
String... args) throws Exception {
final Options opts = buildCliOptions();
CommandLineParser parser = new GnuParser();
CommandLine commandLine = parser.parse(opts, args, true);
return getNameNodePaths(commandLine, conf);
}
private static HashSet<String> getNameNodePaths(CommandLine line,
Configuration conf) throws Exception {
HashSet hashSet = new HashSet();
BufferedReader reader = new BufferedReader(
new InputStreamReader(new FileInputStream(line.getOptionValue("d")), "UTF-8"));
try {
String line;
while ((line = reader.readLine()) != null) {
if (!line.trim().isEmpty()) {
hashSet.add(line1);
}
}
} finally {
IOUtils.cleanup(LOG, reader);
}
return hashSet;
}

HDFS Federation
HDFS Federation

5 测试

** 功能 **

HDFS Federation

** 兼容性 **

HDFS Federation

6 相关 Patch

https://issues.apache.org/jira/browse/HADOOP-12253

https://issues.apache.org/jira/browse/HIVE-11364

(hive 配置, hive.exec.stagingdir)

https://issues.apache.org/jira/browse/HIVE-10790

(orc 表空目录读取,新分区写入,spark 依赖此包)

https://issues.apache.org/jira/browse/HIVE-11920

(add jar failing with url schemes)

https://issues.apache.org/jira/browse/HDFS-2139

(fastcopy,需更新 datanode 代码 hardLink)

7 数据划分方式

做好合理的数据划分方式是解决问题的关键,主要由两种方式:

  • 根据 hdfs 目录划分
  • 根据表的分区划分

根据 hdfs 目录划分,可以对 namenode 流量进行分流,做到业务数据分割,但需要对客户端有极强的掌控能力,拷贝数据时需要较长的时间,集群停服务时间达数小时以上。根据表的分区划分,可以将历史冷数据拷贝至单独的 namenode,但会导致 mounttabl 异常复杂,历史数据一旦变更会有较大的风险。

结合数据特点和业务需求及 namenode 的瓶颈,我们决定将 stg 层数据迁移至新的 namenode,目前 stg 层数据主要有三种接入方式,flume,sqoop,databus(贝壳找房自研数据流转工具),我们对这三个工具有较强的掌控力。迁移时再根据分区进行分批迁移,来解决集群长时间停服务对实时任务及 etl 任务的影响,做到平稳迁移。同时我们也把 yarn.log-aggregation 目录迁移至新的 namenode。

8 域名格式兼容性

通过运营手段解决 viewfs 兼容性问题:

  • hdfs:///user
  • hive 创建 temproty 表
  • load data inpath(sqoop)
  • flume 日志收集文件无法正常关闭
  • hivemeta 建表兼容性 (迁移数据过度时期)

9Viewfs 上线步骤

  • 根据 hdfs-audit 确定读写 stg 数据客户端,并进行客户端 viewfs 升级 (hadoop,hive,tez,spark,flume,sqoop)
  • 解决客户端程序 viewfs 兼容性问题
  • 修改 flume 导数据配置将 tmp 目录修改至新的 namenode 目录
  • stg 层历史数据分阶段 mv 至新目录进行 fastcp 同步数据
  • 修改 hive 对应表的元数据信息

10 总结 & 展望

federation 一期我们主要迁移 stg 层数据,可降低 namenode 内存 30%,后续随着数据的增加我们再进行不同业务间的数据迁移 viewfs 需要对 hadoop 客户端有一定的掌控能力,我们也会对 RBF 进行调研引入,屏蔽 mounttable 的修改对客户端的影响,RouterFederation 每个 NameNode 作为一个子集群挂载到 RF 层,挂载信息保存在 Zookeeper 中。

RF 可以具有多个 DfsRouter 进程,负责接收用户的请求,并从 Zookeeper 中获取 Mount 信息。同时,每个 NameNode 会定时向 RF 发送心跳信息,RF 会感知 NameNode 的存活以及在 HA 状态下的 Active 状态。

HDFS Federation

作者介绍:
数大大(企业代号名),贝壳找房大数据工程师,大数据架构团队成员,目前主要负责 hadoop,hive 集群。

本文转载自公众号贝壳产品技术(ID:gh_9afeb423f390)。

原文链接:

https://mp.weixin.qq.com/s/ke5IBpHERWWwfxvsblMqOQ

评论

发布