AI实践哪家强?来 AICon, 解锁技术前沿,探寻产业新机! 了解详情
写点什么

使用 Amazon EMR 和 Apache Hudi 插入、更新、删除 S3 上的数据

  • 2019-11-28
  • 本文字数:4459 字

    阅读完需:约 15 分钟

使用 Amazon EMR 和 Apache Hudi 插入、更新、删除 S3 上的数据

将数据存储在 Amazon S3 中会在扩展、可靠性和成本效益方面提供很多优势。 除此之外,您可以使用 Apache SparkHivePresto 等开源工具来利用 Amazon EMR 处理和分析您的数据。 尽管这些工具很强大,但若要处理需要您进行增量数据处理和记录级插入、更新和删除的使用案例,仍然具有挑战性。


我们与客户交谈发现,有些使用案例需要处理对个别记录的增量更改,例如:


  • 遵守数据隐私规定,在这些情况下,其用户选择行使他们被遗忘的权限或就如何使用他们的数据更改同意。

  • 在您必须处理指定数据插入和更新事件时,处理流数据。

  • 使用变更数据捕获 (CDC) 架构从企业数据仓库或运营数据存储跟踪和提取数据库更改日志。

  • 恢复延迟到达的数据,或分析截至特定时间点的数据。


从今天开始,EMR 版本 5.28.0 包含 Apache Hudi(孵化),因此,您不再需要构建自定义解决方案来执行记录级插入、更新和删除操作。Hudi 开发于 2016 年开始于 Uber,用于解决提取和 ETL 管道间的效率低下。 近几个月来,EMR 团队与 Apache Hudi 社区密切合作,贡献了很多修补程序,包括将 Hudi 更新为 Spark 2.4.4 (HUDI-12)、支持 Spark Avro (HUDI-91)、增加对 AWS Glue Data Catalog (HUDI-306) 的支持以及多个漏洞修复。


使用 Hudi,您可以在 S3 上执行记录级插入、更新和删除,以便能够符合数据隐私法、使用实时流和变更数据捕获、恢复延迟到达的数据及以开放的、与供应商无关的格式追踪历史记录和回滚。您将创建数据集和表,Hudi 则管理基础数据格式。 Hudi 使用 Apache ParquetApache Avro 进行数据存储,并且包括与 Spark、Hive 和 Presto 的内置集成,以便您能够使用与当前使用相同的工具来查询 Hudi 数据集,并能近乎实时地访问全新数据。


启动 EMR 集群时,适用于 Hudi 的库和工具会在至少选择了以下组件之一:Hive、Spark 或 Presto 的任何时候自动安装和配置。 您可以使用 Spark 创建新的 Hudi 数据集,并插入、更新和删除数据。每个 Hudi 数据集均在您的集群的配置元存储(包括 AWS Glue Data Catalog)中进行注册,并显示为可使用 Spark、Hive 和 Presto 进行查询的表。


Hudi 支持两种存储类型,这两种类型定义写入、索引和从 S3 中读取数据的方式:


  • 写入时复制 – 数据按分列格式 (Parquet) 存储,且更新会在写入期间创建文件的新版本。此存储类型最适用于读取密集型工作负载,因为最新版的数据集始终以有效的分列文件提供。

  • 读取时合并 – 数据以分列 (Parquet) 和分行 (Avro) 格式的组合形式存储;更新记录在分行的“delta 文件”中,随后再被压缩,以创建新版本的分列文件。 此存储类型最适用于写入密集型工作负载,因为新提交内容像 delta 文件一样被快速写入,但读取数据集需要将压缩的分列文件与 delta 文件合并。


我们来快速概览如何在 EMR 集群中设置和使用 Hudi 数据集。


**将 Apache Hudi 与 Amazon EMR 结合使用


**我开始从 EMR 控制台中创建集群。在高级选项中,我选择 EMR 版本 5.28.0(包含 Hudi 的第一个版本)和以下应用程序:Spark、Hive 和 Tez。在硬件选项中,我添加了 3 个任务节点,以确保拥有足够的容量来同时运行 Spark 和 Hive。


当集群准备就绪时,我使用我在安全选项中选择的密钥对来 SSH 到主节点并访问 Spark Shell。我使用下面的命令来启动 Spark Shell 以将其用于 Hudi:


Bash


$ spark-shell --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer"              --conf "spark.sql.hive.convertMetastoreParquet=false"              --jars /usr/lib/hudi/hudi-spark-bundle.jar,/usr/lib/spark/external/lib/spark-avro.jar
复制代码


在这里,我使用下面的 Scala 代码使用“写入时复制”存储类型将一些示例 ELB 日志导入 Hudi 数据集中:


Scala


import org.apache.spark.sql.SaveModeimport org.apache.spark.sql.functions._import org.apache.hudi.DataSourceWriteOptionsimport org.apache.hudi.config.HoodieWriteConfigimport org.apache.hudi.hive.MultiPartKeysValueExtractor
//将各种输入值设置为变量val inputDataPath = "s3://athena-examples-us-west-2/elb/parquet/year=2015/month=1/day=1/"val hudiTableName = "elb_logs_hudi_cow"val hudiTablePath = "s3://MY-BUCKET/PATH/" + hudiTableName
//设置我们的 Hudi 数据源选项val hudiOptions = Map[String,String]( DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "request_ip", DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "request_verb", HoodieWriteConfig.TABLE_NAME -> hudiTableName, DataSourceWriteOptions.OPERATION_OPT_KEY -> DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL, DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "request_timestamp", DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY -> "true", DataSourceWriteOptions.HIVE_TABLE_OPT_KEY -> hudiTableName, DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> "request_verb", DataSourceWriteOptions.HIVE_ASSUME_DATE_PARTITION_OPT_KEY -> "false", DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY -> classOf[MultiPartKeysValueExtractor].getName)
//从 S3 中读取数据并使用分区和记录密钥创建 DataFrameval inputDF = spark.read.format("parquet").load(inputDataPath)
//将数据写入 Hudi 数据集中inputDF.write .format("org.apache.hudi") .options(hudiOptions) .mode(SaveMode.Overwrite) .save(hudiTablePath)
复制代码


在 Spark Shell 中,我现在可以对 Hudi 数据集中的记录进行计数:


scala> inputDF2.count()


res1: Long = 10491958


在选项中,我使用了与为集群配置的 Hive 元存储的集成,以便在默认数据库中创建表。用此方式,我可以使用 Hive 查询 Hudi 数据集中的数据:


hive> use default;


hive> select count(*) from elb_logs_hudi_cow;


...


OK


10491958


...


现在,我可以更新或删除数据集中的单个记录。在 Spark Shell 中,我准备了一些变量来查找我想要更新的记录,并准备了一个 SQL 语句来选择我想要更改的列值:


Scala


val requestIpToUpdate = "243.80.62.181"val sqlStatement = s"SELECT elb_name FROM elb_logs_hudi_cow WHERE request_ip = '$requestIpToUpdate'"
复制代码


我执行 SQL 语句来查看列的当前值:


scala> spark.sql(sqlStatement).show()


+------------+


| elb_name|


+------------+


|elb_demo_003|


+------------+


然后,我选择并更新记录:


Scala


//使用单个记录创建 DataFrame 并更新列值val updateDF = inputDF.filter(col("request_ip") === requestIpToUpdate)                      .withColumn("elb_name", lit("elb_demo_001"))
复制代码


现在,我使用与我用于创建数据集的语法相似的语法来更新 Hudi 数据集。但此时,我编写的 DataFrame 仅包含一个记录:


Scala


//将 DataFrame 编写为现有 Hudi 数据集的更新updateDF.write        .format("org.apache.hudi")        .options(hudiOptions)        .option(DataSourceWriteOptions.OPERATION_OPT_KEY,                DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)        .mode(SaveMode.Append)        .save(hudiTablePath)
复制代码


在 Spark Shell 中,我检查了更新的结果:


scala> spark.sql(sqlStatement).show()


+------------+


| elb_name|


+------------+


|elb_demo_001|


+------------+


现在,我想要删除相同的记录。要删除记录,我将 EmptyHoodieRecordPayload 有效负载传递到写入选项中:


Scala


//使用 EmptyHoodieRecordPayload 编写 DataFrame 以删除记录updateDF.write        .format("org.apache.hudi")        .options(hudiOptions)        .option(DataSourceWriteOptions.OPERATION_OPT_KEY,                DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)        .option(DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY,                "org.apache.hudi.EmptyHoodieRecordPayload")        .mode(SaveMode.Append)        .save(hudiTablePath)
复制代码


在 Spark Shell 中,我发现记录不再可用:


scala> spark.sql(sqlStatement).show()


+--------+


|elb_name|


+--------+


+--------+


Hudi 如何管理所有这些更新和删除操作? 我们来使用 Hudi 命令行界面 (CLI) 连接到数据集,然后发现这些更改被解读为提交内容:



此数据集为“写入时复制”数据集,这意味着,每当对记录进行更新时,包含该记录的文件将被重新编写,以包含更新的值。您可以看到每个提交编写了多少记录。表格的底行描述了数据集的初始创建,其上面是单个记录的更新,顶行是单个记录的删除。


使用 Hudi,您可以回滚到每个提交。例如,我可以使用以下命令回滚删除操作:


hudi:elb_logs_hudi_cow->commit rollback --commit 20191104121031


在 Spark Shell 中,记录现在已返回原来的位置,刚好在更新之后:


scala> spark.sql(sqlStatement).show()


+------------+


| elb_name|


+------------+


|elb_demo_001|


+------------+


“写入时复制”为默认的存储类型。我可以重复上述步骤,通过将“读取时合并”数据集类型添加到我们的 hudiOptions 中来创建和更新该数据集类型:


DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY -> "MERGE_ON_READ"


如果您更新“读取时合并”数据集并使用 Hudi CLI 查看提交内容,您可以看到“读取时合并”与“写入时复制”相比有何不同。使用“读取时合并”,您只会写入更新的行,而不是像“写入时复制”一样写入整个文件。这就是“读取时合并”有助于需要更多写入或更新/删除密集型工作负载、具有较少读取次数的使用案例的原因。Delta 提交内容以 Avro 记录(基于行的存储)形式写入磁盘中,压缩的数据以 Parquet 文件(分列存储)的形式写入。为了避免创建太多 delta 文件,Hudi 将自动压缩您的数据集,以使您的读取尽可能具有高性能。


创建“读取时合并”数据集时,会创建两个 Hive 表:


  • 第一个表与数据集的名称匹配。

  • 第二个表的名称附加有字符 _rt;后缀 _rt 代表 real-time


查询时,第一个表会返回已压缩的数据,并且将不会显示最新的 delta 提交内容。使用此表会提供最佳性能,但会忽略最新的数据。查询实时表会将压缩的数据与读取时的 delta 提交内容合并,因此,此数据集被称为“读取时合并”。此操作将产生可用的最新数据,但会产生性能开销,且不会像查询压缩数据一样具有高性能。用此方式,数据工程师和分析师可以在性能与数据新鲜度之间灵活地进行选择。


**现已推出


**此新功能现已在具有 EMR 5.28.0 的所有区域推出。 将 Hudi 与 EMR 结合使用不会产生额外费用。您可以在 EMR 文档中了解有关 Hudi 的更多信息。这款新工具可以简化您处理、更新和删除 S3 中的数据的方式。请让我知道您打算将它用于哪些使用案例!


本文转载自 AWS 技术博客。


原文链接:


https://amazonaws-china.com/cn/blogs/china/new-insert-update-delete-data-on-s3-with-amazon-emr-and-apache-hudi/


2019-11-28 08:001310

评论

发布
暂无评论
发现更多内容

高效理解机器学习

俞凡

机器学习 算法

城市的智能进化,汇成数字中国的璀璨银河

脑极体

智慧城市

轻松处理pdf文件:Acrobat Pro DC 2023 中文激活版

真大的脸盆

Mac Mac 软件 PDF编辑 pdf编辑工具

Bash 脚本中,特殊变量$0到底是什么?

wljslmz

bash Linux 三周年连更

如何评价 ChatGPT 回答策略的 ensure only ethical usage 特质

汪子熙

ChatGPT ChatGPT4 三周年连更

极速上手使用Docker,这篇文章就够了!

浅羽技术

Java Docker centos 容器化 三周年连更

切片的其他妙用

宇宙之一粟

Go 切片 三周年连更

算法题每日一练:螺旋矩阵 I

知心宝贝

数据结构 算法 前端 后端 三周年连更

大模型“涌现”的思维链,究竟是一种什么能力?

脑极体

人工智能

我们如何将 Amazon Snowcone 送入轨道

亚马逊云科技 (Amazon Web Services)

通过华为云ECS云服务器搭建安防视频监控平台

DS小龙哥

三周年连更

挑战 30 天学完 Python:Day15 错误类型

MegaQi

挑战30天学完Python 三周年连更

读书笔记:如何成为某个领域的前1%

老张

读书笔记 方法 写作技巧

Java注解编译期处理AbstractProcessor详解

石臻臻的杂货铺

Java

深入探讨Go语言中Semaphore信号量的机制原理

Jack

挑战 30 天学完 Python:Day14 高阶函数

MegaQi

挑战30天学完Python 三周年连更

LoRA: 大语言模型个性化的最佳实践

Zilliz

Towhee 大语言模型

Go 方法接收器:选择值接收器还是指针接收器?

陈明勇

Go golang 方法 三周年连更 方法接收器

Matlab实现机器学习

袁袁袁袁满

三周年连更

《底层逻辑2:理解商业世界的本质》

石云升

读书笔记 三周年连更

手撕代码系列(三)

Immerse

JavaScript 前端面试题 手撕代码 ES6-ES12 面试必考

2023阿里云合作伙伴大会-主论坛回顾

科技pai

阿里云 伙伴大会 2023阿里云合作伙伴大会

CDH安装与部署

乌龟哥哥

三周年连更

Windows下 IDE工具常见编译错误FAQ

鸿蒙之旅

OpenHarmony 三周年连更

Qz学算法-数据结构篇(二分查找、删除)

浅辄

三周年连更

火山引擎云原生数据仓库ByteHouse技术白皮书V1.0 (Ⅵ)

字节跳动数据平台

大数据 数据仓库 云原生 元数据 企业号 4 月 PK 榜

Matlab实现最优化

Shine

三周年连更

2023-04-28:将一个给定字符串 s 根据给定的行数 numRows 以从上往下、从左到右进行 Z 字形排列 比如输入字符串为 “PAYPALISHIRING“ 行数为 3 时,排列如下 P

福大大架构师每日一题

Go 算法 rust 福大大

音视频八股文(8)-- h264 AnnexB

福大大架构师每日一题

音视频 ffmpeg 流媒体

云资源提供技术

阿泽🧸

云资源 三周年连更

爱在日落黄昏时 | 我有话要说

后台技术汇

三周年连更

使用 Amazon EMR 和 Apache Hudi 插入、更新、删除 S3 上的数据_语言 & 开发_亚马逊云科技 (Amazon Web Services)_InfoQ精选文章