使用 AWS Step Functions 和 AWS Glue 编排基于 Amazon Redshift 的 ETL 工作流(二)

阅读数:1 2020 年 1 月 2 日 14:36

使用 AWS Step Functions 和 AWS Glue 编排基于 Amazon Redshift 的 ETL 工作流(二)

使用 AWS Glue Python Shell 构建

首先,在 AWS 管理控制台中导航到 AWS Glue

建立连接

Amazon Redshift 集群位于 VPC 中,因此您首先需要使用 AWS Glue 创建连接。连接包含访问数据存储所需的属性,包括 VPC 网络信息。您最终将此连接附加到 Glue Python Shell 作业,以便其可以到达 Amazon Redshift 集群。

从菜单栏中选择连接,然后选择添加连接。为您的连接指定一个名字(例如 _blog_rs_connection_),选择 Amazon Redshift 作为 连接类型,然后选择下一步,如以下屏幕截图所示。

使用 AWS Step Functions 和 AWS Glue 编排基于 Amazon Redshift 的 ETL 工作流(二)

集群下,输入 AWS CloudFormation 模板启动的集群的名称,即 _blogstack-redshiftcluster-####__。_ 因为我为此博客提供的 Python 代码已处理凭证检索,所以您在此处输入的围绕数据库信息的其余值大部分为占位符。您与连接关联的关键信息与网络相关。

请注意,如果没有正确的集群信息,将无法测试连接如果您对此感兴趣,请注意,在选择了正确的集群之后,将自动填充数据库名称用户名,如以下屏幕截图所示。请遵循此处的说明 Secrets Manager 中检索密码信息,然后将其复制到密码字段中。

使用 AWS Step Functions 和 AWS Glue 编排基于 Amazon Redshift 的 ETL 工作流(二)

ETL 代码审查

了解此示例中使用的两个主要的 Python 脚本:

Pygresql_redshift_common.py 是一组功能,可以从 Secrets Manger 中检索群集连接信息和凭据,建立与集群的连接,并分别提交查询。通过传递的参数检索运行时集群信息,这些功能使作业可以连接到有权限的任何集群。您可以按照说明创建 python .egg 文件(已作为 AWS CloudFormation 模板启动的一部分完成)将这些函数打包到库中。请注意,AWS Glue Python Shell 本机原生支持多个 python 库

Python

复制代码
import pg
import boto3
import base64
from botocore.exceptions import ClientError
import json
#uses session manager name to return connection and credential information
def connection_info(db):
session = boto3.session.Session()
client = session.client(
service_name='secretsmanager'
)
get_secret_value_response = client.get_secret_value(SecretId=db)
if 'SecretString' in get_secret_value_response:
secret = json.loads(get_secret_value_response['SecretString'])
else:
secret = json.loads(base64.b64decode(get_secret_value_response['SecretBinary']))
return secret
#creates a connection to the cluster
def get_connection(db,db_creds):
con_params = connection_info(db_creds)
rs_conn_string = "host=%s port=%s dbname=%s user=%s password=%s" % (con_params['host'], con_params['port'], db, con_params['username'], con_params['password'])
rs_conn = pg.connect(dbname=rs_conn_string)
rs_conn.query("set statement_timeout = 1200000")
return rs_conn
#submits a query to the cluster
def query(con,statement):
res = con.query(statement)
return res

调用时,AWS Glue Python Shell 作业运行 rs_query.py。它首先解析在调用时传递的作业参数。它使用其中一些参数从 S3 检索 .sql 文件,然后使用 pygresql_redshift_common.py 中的函数连接文件中的语句并将其提交给集群。因此,除了使用刚打包的 Python 库连接到任何集群之外,它还可以检索并运行任何 SQL 语句。这意味着您只需将参数传输到完成管道中每个任务应连接的位置和应提交的项目,即可为所有基于 Amazon Redshift 的 ETL 管理单个 AWS Glue Python Shell 作业。

Python

复制代码
from redshift_module import pygresql_redshift_common as rs_common
import sys
from awsglue.utils import getResolvedOptions
import boto3
#get job args
args = getResolvedOptions(sys.argv,['db','db_creds','bucket','file'])
db = args['db']
db_creds = args['db_creds']
bucket = args['bucket']
file = args['file']
#get sql statements
s3 = boto3.client('s3')
sqls = s3.get_object(Bucket=bucket, Key=file)['Body'].read().decode('utf-8')
sqls = sqls.split(';')
#get database connection
print('connecting...')
con = rs_common.get_connection(db,db_creds)
#run each sql statement
print("connected...running query...")
results = []
for sql in sqls[:-1]:
sql = sql + ';'
result = rs_common.query(con, sql)
print(result)
results.append(result)
print(results)

创建 Glue Python Shell 作业

接下来,将该代码付诸实践:

  1. 导航到 AWS Glue 控制台页面左侧菜单上的作业,然后从那里选择添加作业
  2. 为作业指定名称,例如 blog_rs_query
  3. 对于 IAM 角色,选择您先前在 AWS CloudFormation 控制台的资源部分中记下的相同 GlueExecutionRole
  4. 对于类型,选择 Python shell,将 Python version 保留为 Python 3 的默认值,并为此作业运行选择一个您提供的现有脚本
  5. 对于存储脚本的 S3 路径,请导航到 AWS CloudFormation 模板创建的脚本存储桶(在资源中查找 ScriptBucket),然后选择 python/py 文件。
  6. 展开安全性配置、脚本库和作业参数部分,将带有 Amazon Redshift 连接库的 Python .egg 文件添加到 Python 库路径。它也位于下 python /redshift_module-0.1-py3.6.egg 下的脚本存储区中。

完成上述所有操作后,所有内容应看上去与以下屏幕截图中的相同:

使用 AWS Step Functions 和 AWS Glue 编排基于 Amazon Redshift 的 ETL 工作流(二)

选择下一步。通过选择选择将其移动到必需连接下,添加您创建的连接。(从 _ 建立连接 _ 部分开始回想,这使作业能够与 VPC 进行交互。) 选择保存作业并编辑脚本完成操作,如以下屏幕截图所示。

使用 AWS Step Functions 和 AWS Glue 编排基于 Amazon Redshift 的 ETL 工作流(二)

测试驱动 Python Shell 作业

创建作业后,会转到 AWS Glue Python Shell IDE。如果一切顺利,您应该看到 rs_query.py 代码。现在,Amazon Redshift 集群进入空闲状态,因此请使用 Python 代码运行以下 SQL 语句并使用表格进行填充。

  1. 创建一个外部数据库 amzreviews_)_。

  2. 创建一个外部表评论 _)_,Amazon Redshift Spectrum 可以从该表中读取 S3 中的源数据(公共评论数据集)。该表按 product_category 进行分区,因为源文件是按类别进行组织的,但是通常您应该对经常筛选的列进行分区(请参阅 #4 )。

  3. 将分区添加到外部表。

  4. 创建一个本地内部表(评论_)到 Amazon Redshift 集群。product_id 作为 DISTKEY 使用时效果很好,因为它具有高基数,分布均匀,并且很可能(尽管不是此博客场景的明确组成部分)有一列将用于与其他表联接。我选择 review_date 作为 SORTKEY,以有效过滤掉不属于我的目标查询的评论数据(2015 年之后)。通过阅读设计表 _ 文档,了解有关如何最佳选择 DISTKEY/SORTKEY 以及其他表设计参数,进而优化性能的更多信息。

    SQL

复制代码
CREATE EXTERNAL SCHEMA amzreviews
from data catalog
database 'amzreviews'
iam_role 'rolearn'
CREATE EXTERNAL database IF NOT EXISTS;
CREATE EXTERNAL TABLE amzreviews.reviews(
marketplace varchar(10),
customer_id varchar(15),
review_id varchar(15),
product_id varchar(25),
product_parent varchar(15),
product_title varchar(50),
star_rating int,
helpful_votes int,
total_votes int,
vine varchar(5),
verified_purchase varchar(5),
review_headline varchar(25),
review_body varchar(1024),
review_date date,
year int)
PARTITIONED BY (
product_category varchar(25))
ROW FORMAT SERDE
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
's3://amazon-reviews-pds/parquet/';
ALTER TABLE amzreviews.reviews ADD
partition(product_category='Apparel')
location 's3://amazon-reviews-pds/parquet/product_category=Apparel/'
partition(product_category='Automotive')
location 's3://amazon-reviews-pds/parquet/product_category=Automotive'
partition(product_category='Baby')
location 's3://amazon-reviews-pds/parquet/product_category=Baby'
partition(product_category='Beauty')
location 's3://amazon-reviews-pds/parquet/product_category=Beauty'
partition(product_category='Books')
location 's3://amazon-reviews-pds/parquet/product_category=Books'
partition(product_category='Camera')
location 's3://amazon-reviews-pds/parquet/product_category=Camera'
partition(product_category='Grocery')
location 's3://amazon-reviews-pds/parquet/product_category=Grocery'
partition(product_category='Furniture')
location 's3://amazon-reviews-pds/parquet/product_category=Furniture'
partition(product_category='Watches')
location 's3://amazon-reviews-pds/parquet/product_category=Watches'
partition(product_category='Lawn_and_Garden')
location 's3://amazon-reviews-pds/parquet/product_category=Lawn_and_Garden';
CREATE TABLE reviews(
marketplace varchar(10),
customer_id varchar(15),
review_id varchar(15),
product_id varchar(25) DISTKEY,
product_parent varchar(15),
product_title varchar(50),
star_rating int,
helpful_votes int,
total_votes int,
vine varchar(5),
verified_purchase varchar(5),
review_date date,
year int,
product_category varchar(25))
SORTKEY (
review_date
);

手动执行此第一项作业,以便您可以看到我所讨论的所有元素在哪里发挥作用。选择 IDE 屏幕顶部的运行作业。展开安全性配置、脚本库和作业参数部分。您可以在此处将参数作为键值对添加,如以下屏幕截图所示。

col 1 col 2
–db reviews
–db_creds reviewssecret
–bucket
–file sql/reviewsschema.sql

使用 AWS Step Functions 和 AWS Glue 编排基于 Amazon Redshift 的 ETL 工作流(二)

选择运行作业以将其启动。完成该作业需要几秒钟。您可以在 IDE 中的代码下方查找日志输出,以观察作业进度。

作业完成后,请导航至 AWS Glue 控制台中的数据库,然后查找 amzreviews 数据库和 reviews 表,如以下屏幕截图所示。如果它们在该位置,则一切会按计划运行! 您还可以使用 Redshift 查询编辑器或您自己的 SQL 客户端工具连接到 Amazon Redshift 集群,并查找本地 _ 评论 _ 表。

使用 AWS Step Functions 和 AWS Glue 编排基于 Amazon Redshift 的 ETL 工作流(二)

本文转载自 AWS 技术博客。

原文链接: https://amazonaws-china.com/cn/blogs/china/orchestrate-amazon-redshift-based-etl-workflows-with-aws-step-functions-and-aws-glue/

评论

发布