使用 Sqoop 实现 RDS MySQL 到 Redshift 的数据同步

阅读数:25 2019 年 11 月 18 日 08:00

使用Sqoop实现RDS MySQL到Redshift的数据同步

希腊有一个著名的谷堆悖论。“如果1粒谷子落地不能形成谷堆,2粒谷子落地不能形成谷堆,3粒谷子落地也不能形成谷堆,依此类推,无论多少粒谷子落地都不能形成谷堆。但是,事实并非如此。”
这个悖论说的,就是告诉我们量变产生质变,需要一个明显的分割线。如果说,量是一个量化的数据,质是一个结论的话。那么,数据分析做的,就是要分析量,从而引向“定性”、”定质”。定量的了解历史的规律(“质”),从而预测未来。
近几年,大数据风靡全球,越来越多的企业利用 MapReduce,Hive,Spark 等计算框架和工具来为自身的业务提供帮助,在 AWS 上,我们也提供了诸多的服务,帮助用户能够快速地构建起适合自身需求的大数据分析架构,其中,Amazon Redshift 是性能优异并且完全托管的 PB 级别数据仓库服务,提供了标准 SQL 数据库访问接口,并且可以十分方便地与现有的主流商业智能数据分析工具整合,构建企业级数据仓库。

然而,大部分企业的核心数据都存储在关系型数据库中,如何能够有效地将这部分存量数据以及后续的增量数据导入 Redshift 中呢?本文介绍一种使用开源的 Apache Sqoop 工具,帮助我们轻松实现这一过程。

配置步骤:

第一步 准备工作

1.1 修改 MySQL 中的表结构

为了能够实现增量同步,需要在 MySQL 表中增加一列时间戳,该列能够自动记录行被插入更新的时间
为了能够实现同步删除操作,需要在 MySQL 表中增加一列删除记号列,应用对数据库的删除通过标记该列完成,而不是通过传统的 delete 语句,因为通常对于曾经存在过的数据,也有分析的意义

本例需要同步的表为 country,orders,user,其中 country 表为 Mycat 中的全局表,在两台 RDS mysql1 和 mysql2 中都有全部信息,orders 和 user 表为 Mycat 中的分片表,信息分布在 RDS mysql1 和 mysql2 中

mycat_sequence 表是用于记录其他表自增字段信息的功能表,无需同步到 Redshift 中分析

使用Sqoop实现RDS MySQL到Redshift的数据同步

执行如下语句添加两列

alter table country add ifdelete boolean NOT NULL default 0;
alter table country add lastmodified TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMEST AMP;

使用Sqoop实现RDS MySQL到Redshift的数据同步

1.2 创建 EMR 集群

使用Sqoop实现RDS MySQL到Redshift的数据同步

使用Sqoop实现RDS MySQL到Redshift的数据同步

注意勾选上 Hive 和 Sqoop,同时目前 AWS EMR 最新的版本为 5.4.0,其中对一些组件的版本进行了更新,不过 Hive 和 Sqoop 的版本与本文一致

使用Sqoop实现RDS MySQL到Redshift的数据同步

注意选择相应的 VPC 和子网,子网需要有 internet 的路由方便之后 ssh 登入

使用Sqoop实现RDS MySQL到Redshift的数据同步

选择登入的密钥对,Master 安全组使用默认的 ElasticMapReduce-master,不用修改

使用Sqoop实现RDS MySQL到Redshift的数据同步

启动 EMR 集群后,修改 Master 节点的安全组,添加允许公网 ssh 访问

使用Sqoop实现RDS MySQL到Redshift的数据同步

在 EMR 界面获取 master 节点 ssh 登入的信息

使用Sqoop实现RDS MySQL到Redshift的数据同步

1.3 创建 Redshift 数据仓库

首先创建 Redshift 使用的安全组,放行所有源访问 5439 端口的权限

使用Sqoop实现RDS MySQL到Redshift的数据同步

分别在 cn-north-1a 和 cn-north-1b 两个可用区中创建两个子网给 Redshift 使 用,由于之后会通过公网连接 Redshift,这两个子网需要有到 internet 的路由

使用Sqoop实现RDS MySQL到Redshift的数据同步

使用Sqoop实现RDS MySQL到Redshift的数据同步

在 Redshift 中创建子网组,选上之前创建的两个子网组

使用Sqoop实现RDS MySQL到Redshift的数据同步

使用Sqoop实现RDS MySQL到Redshift的数据同步

创建 Redshift 参数组

使用Sqoop实现RDS MySQL到Redshift的数据同步

创建 Redshift 集群实例

使用Sqoop实现RDS MySQL到Redshift的数据同步

使用Sqoop实现RDS MySQL到Redshift的数据同步

选择之前创建的参数组,VPC,子网组和安全组,开启公网访问

使用Sqoop实现RDS MySQL到Redshift的数据同步

获取连接 Redshift 的 JDBC 驱动及连接的 URL 信息

使用Sqoop实现RDS MySQL到Redshift的数据同步

驱动如果无法下载,也可以从如下连接下载

https://s3.cn-north-1.amazonaws.com.cn/junyublog/RedshiftJDBC41-1.1.17.1017.jar

1.4 创建并保存 access key 和 secret access key

之后从 S3 中同步数据到 Redshift 时需要提供 access key 和 secret access key 信息,这边测试时可以全部放开权限

在 IAM 中增加一个用户并赋予权限

使用Sqoop实现RDS MySQL到Redshift的数据同步

使用Sqoop实现RDS MySQL到Redshift的数据同步

使用Sqoop实现RDS MySQL到Redshift的数据同步

下载存有 access key 和 secret access key 的 CSV 文件

使用Sqoop实现RDS MySQL到Redshift的数据同步

1.5 创建 S3 的 bucket 桶

S3 会作为 Hive 表的底层存储

使用Sqoop实现RDS MySQL到Redshift的数据同步

第二步 创建 Hive 表

Hive 表为 RDS 到 Redshift 数据同步的中间表,底层使用 S3 作为存储,另外由于 Hive 的表名不能是 user,这里使用 users

使用Sqoop实现RDS MySQL到Redshift的数据同步

exit; 退出 hive

第三步 安装 MySQL JDBC 驱动 (可选)

下载安装 JDBC 驱动, 最新版的 EMR 不需要,如果在运行 Sqoop 的时候报找不到驱动时需要手动安装

ssh 登入 EMR 的 master 节点

wget https://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-5.1.40.tar.gz

tar xzvf mysql-connector-java-5.1.40.tar.gz

cp mysql-connector-java-5.1.40/ mysql-connector-java-5.1.40-bin.jar /usr/bin/sqoop/lib/

第四步 修改 java 权限,否则执行 Sqoop job 会有 warning 和 error

vim /usr/lib/jvm/java-1.8.0-openjdk-1.8.0.121-0.b13.29.amzn1.86_64/jre/lib/security/java.policy
在 grant{}中添加如下语句

permission javax.management.MBeanTrustPermission “register”;

第五步 配置 Sqoop

5.1 创建 Sqoop 访问数据库的密码,XXXXXX 为创建 RDS mysql1 和 mysql2 时赋予的账号密码

echo –n “XXXXXX” > /home/hadoop/sqoop.password

5.2 创建并执行 Sqoop 任务

其中由于 country 表是全局表,所以这里只是从 mysql1 的 read replica 读副本中同步,而 user 和 orders 表由于是分片表,所以需要分别从 mysql1 和 mysql2 各自的读副本中同步数据
需要注意修改如下指令中的 URL 为自己 RDS 读副本的 URL,同时,对于 user 和 orders,两条 sqoop job 是不同的,第一条 job 中通过 hive-overwrite 参数覆盖上一次 job 执行后遗留在 Hive 表中的数据,第二条 job 是没有 hive-overwrite 参数的,否则会把上一条 job 从 mysql1 中同步的数据错误地删除

使用Sqoop实现RDS MySQL到Redshift的数据同步

下面进行第一次同步,分别执行如下命令将 RDS 中的数据同步到 Hive 表中,第一次执行是全备,根据表中数据量,时间可能较长

sqoop job –exec mysql1_country

sqoop job –exec mysql1_user

sqoop job –exec mysql2_user

sqoop job –exec mysql1_orders

sqoop job –exec mysql2_orders

进入 Hive,查看表是否同步成功

使用Sqoop实现RDS MySQL到Redshift的数据同步

第六步 将 Hive 表中的数据同步到 Redshift 中

使用 JDBC 客户端连接 Redshift,这里使用 SQL Workbench
分别创建 country,user,orders 表及各自的中间表,同时将 Hive 存在 S3 中的数据同步到中间表中,其中 aws_access_key_id 和 aws_secret_access_key 为准备工作中在 IAM 下载的 CSV 中的值

使用Sqoop实现RDS MySQL到Redshift的数据同步

使用Sqoop实现RDS MySQL到Redshift的数据同步

使用Sqoop实现RDS MySQL到Redshift的数据同步

查看 stage 表中的信息是否同步正确

使用Sqoop实现RDS MySQL到Redshift的数据同步

使用Sqoop实现RDS MySQL到Redshift的数据同步

使用Sqoop实现RDS MySQL到Redshift的数据同步

通过如下事务插入更新 country,users,orders 表,并删除中间表

使用Sqoop实现RDS MySQL到Redshift的数据同步

使用Sqoop实现RDS MySQL到Redshift的数据同步

使用Sqoop实现RDS MySQL到Redshift的数据同步

查看数据是否正确插入更新到 country,users,orders 表中

使用Sqoop实现RDS MySQL到Redshift的数据同步

使用Sqoop实现RDS MySQL到Redshift的数据同步

使用Sqoop实现RDS MySQL到Redshift的数据同步

第七步 执行增量同步

人为对 MySQL 中的表进行适当的增删改操作,本例对 country 表执行插入操作, 对 user 表执行插入和更新操作,对 orders 表执行删除操作,注意到时间戳为操作执行时的时间

使用Sqoop实现RDS MySQL到Redshift的数据同步

使用Sqoop实现RDS MySQL到Redshift的数据同步

使用Sqoop实现RDS MySQL到Redshift的数据同步

ssh 登入 EMR 的 master 节点,再次运行 sqoop job 将 MySQL 中插入更新的数据同步到 Hive 表中
sqoop job –exec mysql1_country

sqoop job –exec mysql1_user

sqoop job –exec mysql2_user

sqoop job –exec mysql1_orders

sqoop job –exec mysql2_orders

在 Sqoop 执行输出中可以看到,sqoop job 会记录之前执行任务的时间,并调整 where 语句来实现增量同步数据,所以如果需要多次测试,需要删除 job(sqoop job –delete XXX) 并重新创建,这样会再次全量同步

使用Sqoop实现RDS MySQL到Redshift的数据同步

进入 Hive,查看增量数据是否同步成功

使用Sqoop实现RDS MySQL到Redshift的数据同步

使用 SQL Workbench 通过 JDBC 连接 Redshift,执行如下命令将增量数据同步到中间表

使用Sqoop实现RDS MySQL到Redshift的数据同步

执行如下事务将中间表的数据插入更新到 country,users,orders 表中

使用Sqoop实现RDS MySQL到Redshift的数据同步

使用Sqoop实现RDS MySQL到Redshift的数据同步

使用Sqoop实现RDS MySQL到Redshift的数据同步

查看数据是否正确插入更新到 country,users,orders 表中

使用Sqoop实现RDS MySQL到Redshift的数据同步

使用Sqoop实现RDS MySQL到Redshift的数据同步

使用Sqoop实现RDS MySQL到Redshift的数据同步

之后在 Redshift 中的分析语句都可以通过添加 where ifdelete=false 排除删除的记录,同时可以定期删除 ifdelete 标记为 false 的记录,释放存储空间

作者介绍:

使用Sqoop实现RDS MySQL到Redshift的数据同步

余骏

AWS 解决方案架构师,负责基于 AWS 的云计算方案架构的咨询和设计,同时致力于 AWS 云服务在国内的应用和推广。在加入 AWS 之前,他在思科中国担任系统工程师,负责方案咨询和架构设计,在企业私有云和基础网络方面有丰富经验。

欲了解 AWS 的更多信息,请访问【AWS 技术专区】

评论

发布