2025上半年,最新 AI实践都在这!20+ 应用案例,任听一场议题就值回票价 了解详情
写点什么

使用 Amazon EMR 和 Apache Hudi 插入、更新、删除 S3 上的数据

  • 2019-11-28
  • 本文字数:4459 字

    阅读完需:约 15 分钟

使用 Amazon EMR 和 Apache Hudi 插入、更新、删除 S3 上的数据

将数据存储在 Amazon S3 中会在扩展、可靠性和成本效益方面提供很多优势。 除此之外,您可以使用 Apache SparkHivePresto 等开源工具来利用 Amazon EMR 处理和分析您的数据。 尽管这些工具很强大,但若要处理需要您进行增量数据处理和记录级插入、更新和删除的使用案例,仍然具有挑战性。


我们与客户交谈发现,有些使用案例需要处理对个别记录的增量更改,例如:


  • 遵守数据隐私规定,在这些情况下,其用户选择行使他们被遗忘的权限或就如何使用他们的数据更改同意。

  • 在您必须处理指定数据插入和更新事件时,处理流数据。

  • 使用变更数据捕获 (CDC) 架构从企业数据仓库或运营数据存储跟踪和提取数据库更改日志。

  • 恢复延迟到达的数据,或分析截至特定时间点的数据。


从今天开始,EMR 版本 5.28.0 包含 Apache Hudi(孵化),因此,您不再需要构建自定义解决方案来执行记录级插入、更新和删除操作。Hudi 开发于 2016 年开始于 Uber,用于解决提取和 ETL 管道间的效率低下。 近几个月来,EMR 团队与 Apache Hudi 社区密切合作,贡献了很多修补程序,包括将 Hudi 更新为 Spark 2.4.4 (HUDI-12)、支持 Spark Avro (HUDI-91)、增加对 AWS Glue Data Catalog (HUDI-306) 的支持以及多个漏洞修复。


使用 Hudi,您可以在 S3 上执行记录级插入、更新和删除,以便能够符合数据隐私法、使用实时流和变更数据捕获、恢复延迟到达的数据及以开放的、与供应商无关的格式追踪历史记录和回滚。您将创建数据集和表,Hudi 则管理基础数据格式。 Hudi 使用 Apache ParquetApache Avro 进行数据存储,并且包括与 Spark、Hive 和 Presto 的内置集成,以便您能够使用与当前使用相同的工具来查询 Hudi 数据集,并能近乎实时地访问全新数据。


启动 EMR 集群时,适用于 Hudi 的库和工具会在至少选择了以下组件之一:Hive、Spark 或 Presto 的任何时候自动安装和配置。 您可以使用 Spark 创建新的 Hudi 数据集,并插入、更新和删除数据。每个 Hudi 数据集均在您的集群的配置元存储(包括 AWS Glue Data Catalog)中进行注册,并显示为可使用 Spark、Hive 和 Presto 进行查询的表。


Hudi 支持两种存储类型,这两种类型定义写入、索引和从 S3 中读取数据的方式:


  • 写入时复制 – 数据按分列格式 (Parquet) 存储,且更新会在写入期间创建文件的新版本。此存储类型最适用于读取密集型工作负载,因为最新版的数据集始终以有效的分列文件提供。

  • 读取时合并 – 数据以分列 (Parquet) 和分行 (Avro) 格式的组合形式存储;更新记录在分行的“delta 文件”中,随后再被压缩,以创建新版本的分列文件。 此存储类型最适用于写入密集型工作负载,因为新提交内容像 delta 文件一样被快速写入,但读取数据集需要将压缩的分列文件与 delta 文件合并。


我们来快速概览如何在 EMR 集群中设置和使用 Hudi 数据集。


**将 Apache Hudi 与 Amazon EMR 结合使用


**我开始从 EMR 控制台中创建集群。在高级选项中,我选择 EMR 版本 5.28.0(包含 Hudi 的第一个版本)和以下应用程序:Spark、Hive 和 Tez。在硬件选项中,我添加了 3 个任务节点,以确保拥有足够的容量来同时运行 Spark 和 Hive。


当集群准备就绪时,我使用我在安全选项中选择的密钥对来 SSH 到主节点并访问 Spark Shell。我使用下面的命令来启动 Spark Shell 以将其用于 Hudi:


Bash


$ spark-shell --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer"              --conf "spark.sql.hive.convertMetastoreParquet=false"              --jars /usr/lib/hudi/hudi-spark-bundle.jar,/usr/lib/spark/external/lib/spark-avro.jar
复制代码


在这里,我使用下面的 Scala 代码使用“写入时复制”存储类型将一些示例 ELB 日志导入 Hudi 数据集中:


Scala


import org.apache.spark.sql.SaveModeimport org.apache.spark.sql.functions._import org.apache.hudi.DataSourceWriteOptionsimport org.apache.hudi.config.HoodieWriteConfigimport org.apache.hudi.hive.MultiPartKeysValueExtractor
//将各种输入值设置为变量val inputDataPath = "s3://athena-examples-us-west-2/elb/parquet/year=2015/month=1/day=1/"val hudiTableName = "elb_logs_hudi_cow"val hudiTablePath = "s3://MY-BUCKET/PATH/" + hudiTableName
//设置我们的 Hudi 数据源选项val hudiOptions = Map[String,String]( DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "request_ip", DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "request_verb", HoodieWriteConfig.TABLE_NAME -> hudiTableName, DataSourceWriteOptions.OPERATION_OPT_KEY -> DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL, DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "request_timestamp", DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY -> "true", DataSourceWriteOptions.HIVE_TABLE_OPT_KEY -> hudiTableName, DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> "request_verb", DataSourceWriteOptions.HIVE_ASSUME_DATE_PARTITION_OPT_KEY -> "false", DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY -> classOf[MultiPartKeysValueExtractor].getName)
//从 S3 中读取数据并使用分区和记录密钥创建 DataFrameval inputDF = spark.read.format("parquet").load(inputDataPath)
//将数据写入 Hudi 数据集中inputDF.write .format("org.apache.hudi") .options(hudiOptions) .mode(SaveMode.Overwrite) .save(hudiTablePath)
复制代码


在 Spark Shell 中,我现在可以对 Hudi 数据集中的记录进行计数:


scala> inputDF2.count()


res1: Long = 10491958


在选项中,我使用了与为集群配置的 Hive 元存储的集成,以便在默认数据库中创建表。用此方式,我可以使用 Hive 查询 Hudi 数据集中的数据:


hive> use default;


hive> select count(*) from elb_logs_hudi_cow;


...


OK


10491958


...


现在,我可以更新或删除数据集中的单个记录。在 Spark Shell 中,我准备了一些变量来查找我想要更新的记录,并准备了一个 SQL 语句来选择我想要更改的列值:


Scala


val requestIpToUpdate = "243.80.62.181"val sqlStatement = s"SELECT elb_name FROM elb_logs_hudi_cow WHERE request_ip = '$requestIpToUpdate'"
复制代码


我执行 SQL 语句来查看列的当前值:


scala> spark.sql(sqlStatement).show()


+------------+


| elb_name|


+------------+


|elb_demo_003|


+------------+


然后,我选择并更新记录:


Scala


//使用单个记录创建 DataFrame 并更新列值val updateDF = inputDF.filter(col("request_ip") === requestIpToUpdate)                      .withColumn("elb_name", lit("elb_demo_001"))
复制代码


现在,我使用与我用于创建数据集的语法相似的语法来更新 Hudi 数据集。但此时,我编写的 DataFrame 仅包含一个记录:


Scala


//将 DataFrame 编写为现有 Hudi 数据集的更新updateDF.write        .format("org.apache.hudi")        .options(hudiOptions)        .option(DataSourceWriteOptions.OPERATION_OPT_KEY,                DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)        .mode(SaveMode.Append)        .save(hudiTablePath)
复制代码


在 Spark Shell 中,我检查了更新的结果:


scala> spark.sql(sqlStatement).show()


+------------+


| elb_name|


+------------+


|elb_demo_001|


+------------+


现在,我想要删除相同的记录。要删除记录,我将 EmptyHoodieRecordPayload 有效负载传递到写入选项中:


Scala


//使用 EmptyHoodieRecordPayload 编写 DataFrame 以删除记录updateDF.write        .format("org.apache.hudi")        .options(hudiOptions)        .option(DataSourceWriteOptions.OPERATION_OPT_KEY,                DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)        .option(DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY,                "org.apache.hudi.EmptyHoodieRecordPayload")        .mode(SaveMode.Append)        .save(hudiTablePath)
复制代码


在 Spark Shell 中,我发现记录不再可用:


scala> spark.sql(sqlStatement).show()


+--------+


|elb_name|


+--------+


+--------+


Hudi 如何管理所有这些更新和删除操作? 我们来使用 Hudi 命令行界面 (CLI) 连接到数据集,然后发现这些更改被解读为提交内容:



此数据集为“写入时复制”数据集,这意味着,每当对记录进行更新时,包含该记录的文件将被重新编写,以包含更新的值。您可以看到每个提交编写了多少记录。表格的底行描述了数据集的初始创建,其上面是单个记录的更新,顶行是单个记录的删除。


使用 Hudi,您可以回滚到每个提交。例如,我可以使用以下命令回滚删除操作:


hudi:elb_logs_hudi_cow->commit rollback --commit 20191104121031


在 Spark Shell 中,记录现在已返回原来的位置,刚好在更新之后:


scala> spark.sql(sqlStatement).show()


+------------+


| elb_name|


+------------+


|elb_demo_001|


+------------+


“写入时复制”为默认的存储类型。我可以重复上述步骤,通过将“读取时合并”数据集类型添加到我们的 hudiOptions 中来创建和更新该数据集类型:


DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY -> "MERGE_ON_READ"


如果您更新“读取时合并”数据集并使用 Hudi CLI 查看提交内容,您可以看到“读取时合并”与“写入时复制”相比有何不同。使用“读取时合并”,您只会写入更新的行,而不是像“写入时复制”一样写入整个文件。这就是“读取时合并”有助于需要更多写入或更新/删除密集型工作负载、具有较少读取次数的使用案例的原因。Delta 提交内容以 Avro 记录(基于行的存储)形式写入磁盘中,压缩的数据以 Parquet 文件(分列存储)的形式写入。为了避免创建太多 delta 文件,Hudi 将自动压缩您的数据集,以使您的读取尽可能具有高性能。


创建“读取时合并”数据集时,会创建两个 Hive 表:


  • 第一个表与数据集的名称匹配。

  • 第二个表的名称附加有字符 _rt;后缀 _rt 代表 real-time


查询时,第一个表会返回已压缩的数据,并且将不会显示最新的 delta 提交内容。使用此表会提供最佳性能,但会忽略最新的数据。查询实时表会将压缩的数据与读取时的 delta 提交内容合并,因此,此数据集被称为“读取时合并”。此操作将产生可用的最新数据,但会产生性能开销,且不会像查询压缩数据一样具有高性能。用此方式,数据工程师和分析师可以在性能与数据新鲜度之间灵活地进行选择。


**现已推出


**此新功能现已在具有 EMR 5.28.0 的所有区域推出。 将 Hudi 与 EMR 结合使用不会产生额外费用。您可以在 EMR 文档中了解有关 Hudi 的更多信息。这款新工具可以简化您处理、更新和删除 S3 中的数据的方式。请让我知道您打算将它用于哪些使用案例!


本文转载自 AWS 技术博客。


原文链接:


https://amazonaws-china.com/cn/blogs/china/new-insert-update-delete-data-on-s3-with-amazon-emr-and-apache-hudi/


2019-11-28 08:001281

评论

发布
暂无评论
发现更多内容

深度学习平台百度飞桨亮相"十三五"科技创新成就展

百度大脑

人工智能 百度

1047 行 MySQL 详细学习笔记(值得学习与收藏),java基础面试题及答案整理

Java 程序员 后端

1年半经验,2本学历,Curd背景,学了阿里P8级架构师的7+1+1落地项目

Java 程序员 后端

2020最新阿里巴巴必问的200个面试题以及答案,助你斩获阿里offer

Java 程序员 后端

2021年九月最新Java面试必背八股文,338道最新大厂架构面试题

Java 程序员 后端

2021年面试会更难?Java必备209道真题,这份清单助你轻松入阿里

Java 程序员 后端

1000页神仙文档,连阿里P8面试官都说太详细了,面面俱到!搞懂这些直接P6+

Java 程序员 后端

10个知识点让你读懂Spring MVC容器,mysql主从复制原理

Java 程序员 后端

2021备战金三银四血拼一波算法:字节+百度,Java进阶推荐

Java 程序员 后端

2020年IT运维市场大前景到底怎么样,mysql数据库sql语句面试题

Java 程序员 后端

2021-07-22 Java练习题,kafka数据存储原理

Java 程序员 后端

2020金九银十面试总结,大厂Java面试必会知识点,基础+底层+算法+数据库

Java 程序员 后端

2021年金三银四最新美团、字节、阿里,阿里巴巴java面试流程

Java 程序员 后端

沃丰科技一体化平台 AI驱动数字与产业深度融合

海比研究院

130道BATJM真题及解析:集合+Spring,华为社招java面试题

Java 程序员 后端

2020金九银十面试总结,大厂Java面试必会知识点(1),java基础入门第二版第二章答案

Java 程序员 后端

10W字解析 SpringBoot技术内幕文档,实战+原理齐飞,java技术上难以解决的问题

Java 程序员 后端

2020年京东Java研发岗社招面经(面试经历+真题总结,java编程教程视频下载

Java 程序员 后端

更务实的联想,要做钢筋铁骨的边缘智能

脑极体

2021先定个小目标?搞清楚MyCat分片的两种拆分方法和分片规则!

Java 程序员 后端

2021全网最新、最全面“互联网大厂面试题库2400页,nginx反向代理负载均衡原理

Java 程序员 后端

2021年京东、拼多多、腾讯,javaspringboot面试题

Java 程序员 后端

2021年最新Java后端学习路线,适用于所有想要踏入Java行业的初学者!

Java 程序员 后端

【稳定性平台】GOREPLAY流量录制回放实战

得物技术

golang 得物 GOREPLAY 稳定性平台

2021年Java面试题抢先看,够全!中篇,java基础程序

Java 程序员 后端

2021年去一线大厂面试先过SSM框架源码这一关!,你还看不明白?

Java 程序员 后端

2021年备战金三银四:死磕“源码,百度网盘搜索引擎java

Java 程序员 后端

用四个问题引导员工解决问题

石云升

职场经验 管理经验 10月月更

10年Java开发经验,超过500人面试阿里的同学,总结出这108道面试题

Java 程序员 后端

2020年,阿里最新的java程序员面试题目含答案带你吊打面试官

Java 程序员 后端

2021年Java面试题抢先看,够全!,java技术支持面试题

Java 程序员 后端

使用 Amazon EMR 和 Apache Hudi 插入、更新、删除 S3 上的数据_语言 & 开发_亚马逊云科技 (Amazon Web Services)_InfoQ精选文章