NVIDIA 初创加速计划,免费加速您的创业启动 了解详情
写点什么

使用 Amazon EMR 和 Apache Hudi 插入、更新、删除 S3 上的数据

  • 2019-11-28
  • 本文字数:4459 字

    阅读完需:约 15 分钟

使用 Amazon EMR 和 Apache Hudi 插入、更新、删除 S3 上的数据

将数据存储在 Amazon S3 中会在扩展、可靠性和成本效益方面提供很多优势。 除此之外,您可以使用 Apache SparkHivePresto 等开源工具来利用 Amazon EMR 处理和分析您的数据。 尽管这些工具很强大,但若要处理需要您进行增量数据处理和记录级插入、更新和删除的使用案例,仍然具有挑战性。


我们与客户交谈发现,有些使用案例需要处理对个别记录的增量更改,例如:


  • 遵守数据隐私规定,在这些情况下,其用户选择行使他们被遗忘的权限或就如何使用他们的数据更改同意。

  • 在您必须处理指定数据插入和更新事件时,处理流数据。

  • 使用变更数据捕获 (CDC) 架构从企业数据仓库或运营数据存储跟踪和提取数据库更改日志。

  • 恢复延迟到达的数据,或分析截至特定时间点的数据。


从今天开始,EMR 版本 5.28.0 包含 Apache Hudi(孵化),因此,您不再需要构建自定义解决方案来执行记录级插入、更新和删除操作。Hudi 开发于 2016 年开始于 Uber,用于解决提取和 ETL 管道间的效率低下。 近几个月来,EMR 团队与 Apache Hudi 社区密切合作,贡献了很多修补程序,包括将 Hudi 更新为 Spark 2.4.4 (HUDI-12)、支持 Spark Avro (HUDI-91)、增加对 AWS Glue Data Catalog (HUDI-306) 的支持以及多个漏洞修复。


使用 Hudi,您可以在 S3 上执行记录级插入、更新和删除,以便能够符合数据隐私法、使用实时流和变更数据捕获、恢复延迟到达的数据及以开放的、与供应商无关的格式追踪历史记录和回滚。您将创建数据集和表,Hudi 则管理基础数据格式。 Hudi 使用 Apache ParquetApache Avro 进行数据存储,并且包括与 Spark、Hive 和 Presto 的内置集成,以便您能够使用与当前使用相同的工具来查询 Hudi 数据集,并能近乎实时地访问全新数据。


启动 EMR 集群时,适用于 Hudi 的库和工具会在至少选择了以下组件之一:Hive、Spark 或 Presto 的任何时候自动安装和配置。 您可以使用 Spark 创建新的 Hudi 数据集,并插入、更新和删除数据。每个 Hudi 数据集均在您的集群的配置元存储(包括 AWS Glue Data Catalog)中进行注册,并显示为可使用 Spark、Hive 和 Presto 进行查询的表。


Hudi 支持两种存储类型,这两种类型定义写入、索引和从 S3 中读取数据的方式:


  • 写入时复制 – 数据按分列格式 (Parquet) 存储,且更新会在写入期间创建文件的新版本。此存储类型最适用于读取密集型工作负载,因为最新版的数据集始终以有效的分列文件提供。

  • 读取时合并 – 数据以分列 (Parquet) 和分行 (Avro) 格式的组合形式存储;更新记录在分行的“delta 文件”中,随后再被压缩,以创建新版本的分列文件。 此存储类型最适用于写入密集型工作负载,因为新提交内容像 delta 文件一样被快速写入,但读取数据集需要将压缩的分列文件与 delta 文件合并。


我们来快速概览如何在 EMR 集群中设置和使用 Hudi 数据集。


**将 Apache Hudi 与 Amazon EMR 结合使用


**我开始从 EMR 控制台中创建集群。在高级选项中,我选择 EMR 版本 5.28.0(包含 Hudi 的第一个版本)和以下应用程序:Spark、Hive 和 Tez。在硬件选项中,我添加了 3 个任务节点,以确保拥有足够的容量来同时运行 Spark 和 Hive。


当集群准备就绪时,我使用我在安全选项中选择的密钥对来 SSH 到主节点并访问 Spark Shell。我使用下面的命令来启动 Spark Shell 以将其用于 Hudi:


Bash


$ spark-shell --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer"              --conf "spark.sql.hive.convertMetastoreParquet=false"              --jars /usr/lib/hudi/hudi-spark-bundle.jar,/usr/lib/spark/external/lib/spark-avro.jar
复制代码


在这里,我使用下面的 Scala 代码使用“写入时复制”存储类型将一些示例 ELB 日志导入 Hudi 数据集中:


Scala


import org.apache.spark.sql.SaveModeimport org.apache.spark.sql.functions._import org.apache.hudi.DataSourceWriteOptionsimport org.apache.hudi.config.HoodieWriteConfigimport org.apache.hudi.hive.MultiPartKeysValueExtractor
//将各种输入值设置为变量val inputDataPath = "s3://athena-examples-us-west-2/elb/parquet/year=2015/month=1/day=1/"val hudiTableName = "elb_logs_hudi_cow"val hudiTablePath = "s3://MY-BUCKET/PATH/" + hudiTableName
//设置我们的 Hudi 数据源选项val hudiOptions = Map[String,String]( DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "request_ip", DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "request_verb", HoodieWriteConfig.TABLE_NAME -> hudiTableName, DataSourceWriteOptions.OPERATION_OPT_KEY -> DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL, DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "request_timestamp", DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY -> "true", DataSourceWriteOptions.HIVE_TABLE_OPT_KEY -> hudiTableName, DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> "request_verb", DataSourceWriteOptions.HIVE_ASSUME_DATE_PARTITION_OPT_KEY -> "false", DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY -> classOf[MultiPartKeysValueExtractor].getName)
//从 S3 中读取数据并使用分区和记录密钥创建 DataFrameval inputDF = spark.read.format("parquet").load(inputDataPath)
//将数据写入 Hudi 数据集中inputDF.write .format("org.apache.hudi") .options(hudiOptions) .mode(SaveMode.Overwrite) .save(hudiTablePath)
复制代码


在 Spark Shell 中,我现在可以对 Hudi 数据集中的记录进行计数:


scala> inputDF2.count()


res1: Long = 10491958


在选项中,我使用了与为集群配置的 Hive 元存储的集成,以便在默认数据库中创建表。用此方式,我可以使用 Hive 查询 Hudi 数据集中的数据:


hive> use default;


hive> select count(*) from elb_logs_hudi_cow;


...


OK


10491958


...


现在,我可以更新或删除数据集中的单个记录。在 Spark Shell 中,我准备了一些变量来查找我想要更新的记录,并准备了一个 SQL 语句来选择我想要更改的列值:


Scala


val requestIpToUpdate = "243.80.62.181"val sqlStatement = s"SELECT elb_name FROM elb_logs_hudi_cow WHERE request_ip = '$requestIpToUpdate'"
复制代码


我执行 SQL 语句来查看列的当前值:


scala> spark.sql(sqlStatement).show()


+------------+


| elb_name|


+------------+


|elb_demo_003|


+------------+


然后,我选择并更新记录:


Scala


//使用单个记录创建 DataFrame 并更新列值val updateDF = inputDF.filter(col("request_ip") === requestIpToUpdate)                      .withColumn("elb_name", lit("elb_demo_001"))
复制代码


现在,我使用与我用于创建数据集的语法相似的语法来更新 Hudi 数据集。但此时,我编写的 DataFrame 仅包含一个记录:


Scala


//将 DataFrame 编写为现有 Hudi 数据集的更新updateDF.write        .format("org.apache.hudi")        .options(hudiOptions)        .option(DataSourceWriteOptions.OPERATION_OPT_KEY,                DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)        .mode(SaveMode.Append)        .save(hudiTablePath)
复制代码


在 Spark Shell 中,我检查了更新的结果:


scala> spark.sql(sqlStatement).show()


+------------+


| elb_name|


+------------+


|elb_demo_001|


+------------+


现在,我想要删除相同的记录。要删除记录,我将 EmptyHoodieRecordPayload 有效负载传递到写入选项中:


Scala


//使用 EmptyHoodieRecordPayload 编写 DataFrame 以删除记录updateDF.write        .format("org.apache.hudi")        .options(hudiOptions)        .option(DataSourceWriteOptions.OPERATION_OPT_KEY,                DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)        .option(DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY,                "org.apache.hudi.EmptyHoodieRecordPayload")        .mode(SaveMode.Append)        .save(hudiTablePath)
复制代码


在 Spark Shell 中,我发现记录不再可用:


scala> spark.sql(sqlStatement).show()


+--------+


|elb_name|


+--------+


+--------+


Hudi 如何管理所有这些更新和删除操作? 我们来使用 Hudi 命令行界面 (CLI) 连接到数据集,然后发现这些更改被解读为提交内容:



此数据集为“写入时复制”数据集,这意味着,每当对记录进行更新时,包含该记录的文件将被重新编写,以包含更新的值。您可以看到每个提交编写了多少记录。表格的底行描述了数据集的初始创建,其上面是单个记录的更新,顶行是单个记录的删除。


使用 Hudi,您可以回滚到每个提交。例如,我可以使用以下命令回滚删除操作:


hudi:elb_logs_hudi_cow->commit rollback --commit 20191104121031


在 Spark Shell 中,记录现在已返回原来的位置,刚好在更新之后:


scala> spark.sql(sqlStatement).show()


+------------+


| elb_name|


+------------+


|elb_demo_001|


+------------+


“写入时复制”为默认的存储类型。我可以重复上述步骤,通过将“读取时合并”数据集类型添加到我们的 hudiOptions 中来创建和更新该数据集类型:


DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY -> "MERGE_ON_READ"


如果您更新“读取时合并”数据集并使用 Hudi CLI 查看提交内容,您可以看到“读取时合并”与“写入时复制”相比有何不同。使用“读取时合并”,您只会写入更新的行,而不是像“写入时复制”一样写入整个文件。这就是“读取时合并”有助于需要更多写入或更新/删除密集型工作负载、具有较少读取次数的使用案例的原因。Delta 提交内容以 Avro 记录(基于行的存储)形式写入磁盘中,压缩的数据以 Parquet 文件(分列存储)的形式写入。为了避免创建太多 delta 文件,Hudi 将自动压缩您的数据集,以使您的读取尽可能具有高性能。


创建“读取时合并”数据集时,会创建两个 Hive 表:


  • 第一个表与数据集的名称匹配。

  • 第二个表的名称附加有字符 _rt;后缀 _rt 代表 real-time


查询时,第一个表会返回已压缩的数据,并且将不会显示最新的 delta 提交内容。使用此表会提供最佳性能,但会忽略最新的数据。查询实时表会将压缩的数据与读取时的 delta 提交内容合并,因此,此数据集被称为“读取时合并”。此操作将产生可用的最新数据,但会产生性能开销,且不会像查询压缩数据一样具有高性能。用此方式,数据工程师和分析师可以在性能与数据新鲜度之间灵活地进行选择。


**现已推出


**此新功能现已在具有 EMR 5.28.0 的所有区域推出。 将 Hudi 与 EMR 结合使用不会产生额外费用。您可以在 EMR 文档中了解有关 Hudi 的更多信息。这款新工具可以简化您处理、更新和删除 S3 中的数据的方式。请让我知道您打算将它用于哪些使用案例!


本文转载自 AWS 技术博客。


原文链接:


https://amazonaws-china.com/cn/blogs/china/new-insert-update-delete-data-on-s3-with-amazon-emr-and-apache-hudi/


2019-11-28 08:001088

评论

发布
暂无评论
发现更多内容

云起无垠参编中国信通院《软件供应链安全能力中心建设指南》正式发布

云起无垠

基于 ACK Fluid 的混合云优化数据访问(四):将第三方存储目录挂载到 Kubernetes,提升效率和标准化

阿里巴巴云原生

阿里云 Kubernetes 容器 云原生 ACK

AI & Web3 盛会「EDGE」在港闭幕,融云国际影响力持续提升

融云 RongCloud

AI 技术 edge 峰会 web3

软件测试/测试开发/校招推荐 |中科创达软件股份有限公司岗位开放

测试人

程序员 软件测试 招聘 测试开发 内推

大咖汇聚,KaiwuDB 邀您共话 AI+ DB 发展新趋势

KaiwuDB

KaiwuDB AIDB

软件测试/测试开发丨基于人工智能的代码分析与 Bug 检测实战

测试人

人工智能 程序员 软件测试 自动化测试 测试开发

营销黑科技再升级:百度营销擎舵数字人4.0 带来哪些惊喜?

Geek_2d6073

DApp钱包OP链智能合约质押挖矿系统开发源码丨案例丨演示

l8l259l3365

产品经理视角 | API接口知识小结

Noah

产品经理 API接口文档 API产品经理

循环数组,一个可以释放无锁队列的力量

华为云开发者联盟

后端 开发 华为云 华为云开发者联盟

【论文解读】人工智能时代的科学发现

合合技术团队

人工智能 数据 科学

Cisdem Video Player for mac(高清视频播放器) v5.6.0完整激活版

mac

视频播放器 苹果mac Windows软件 Cisdem Video Player

Redis魔法:点燃分布式锁的奇妙实现

互联网工科生

分布式锁 redis 底层原理

实现即时沟通与协作的全功能IM即时通讯系统

WorkPlus

产品经理的API文档阅读指南

Noah

产品经理 API接口文档

FISCO BCOS | 构建第一个区块链应用程序

BSN研习社

区块链 FISCO BCOS 区块链开发

@Scope 注解失效了?咋回事

江南一点雨

Java spring

白皮书商业计划书编写 了解白皮书在区块链稳定币开发中的作用

区块链软件开发推广运营

数字藏品开发 dapp开发 区块链开发 链游开发 NFT开发

深度学习之目标定位

矩视智能

机器视觉 深度学习、

自动化模式下,企业全面预算管理的提升

智达方通

自动化 全面预算管理 预测分析

出海方案全线升级!融云「爱嗨游」线上发布会定档

融云 RongCloud

互联网 全球化 融云 出海 GenAI

更现代化方式实现明暗模式,两行CSS搞定!

这我可不懂

CSS

Mac电脑屏幕分辨率修改 SwitchResX正式激活版

胖墩儿不胖y

Mac软件 屏幕分辨率工具 分辨率调整软件

GLB/GLTF在线纹理编辑

3D建模设计

GLTF glb 材质 纹理 贴图

再创新高!深开鸿OpenHarmony社区代码贡献量超过200万行!

Geek_2d6073

CogVLM:智谱AI 新一代多模态大模型

Geek_2d6073

快速gps定位:AnyGo 免激活最新中文版

mac大玩家j

Mac 软件 定位软件 gps虚拟定位

天谋科技时序数据库 IoTDB 与 openEuler 操作系统完成兼容性互认证

Apache IoTDB

提升代码重用性:模板设计模式在实际项目中的应用

树上有只程序猿

模板模式

9月活动回顾(免费领取PPT)|火山引擎DataLeap、ByteHouse多位专家带来DataOps、实时计算等前沿技术分享!

字节跳动数据平台

数据库 大数据 火山引擎 DataLeap 企业号10月PK榜

WorkPlus安全专属的企业IM助力政企高效协作

WorkPlus

使用 Amazon EMR 和 Apache Hudi 插入、更新、删除 S3 上的数据_语言 & 开发_亚马逊云科技 (Amazon Web Services)_InfoQ精选文章