写点什么

在生产中结合使用 Amazon Redshift Spectrum、Amazon Athena 和 AWS Glue 与 Node.js(三)

  • 2020-01-13
  • 本文字数:6409 字

    阅读完需:约 21 分钟

在生产中结合使用 Amazon Redshift Spectrum、Amazon Athena 和 AWS Glue 与 Node.js(三)

为不同工作负载优化数据结构

由于 S3 的成本相对便宜且我们只需为每个查询扫描的数据付款,我们认为,对不同的工作负载和不同的分析引擎,以不同的格式保存数据是有意义的。需要注意的是,我们可以有任意数量的表指向 S3 上的相同数据。这完全取决于我们如何对数据分区及如何更新表分区。

数据排列

例如,我们有一个一分钟运行一次且为最后一分钟收集的数据生成统计数据的进程。利用 Amazon Redshift,将如下所示通过在表上运行查询来完成此操作:


SQL


SELECT   user,  COUNT(*) FROM   events_table WHERE   ts BETWEEN ‘2017-08-01 14:00:00’ AND ‘2017-08-01 14:00:59’ GROUP BY   user;
复制代码


(假设 ‘ts’ 是为每个事件存储时间戳的列。)


利用 Redshift Spectrum 时,我们为每个查询中扫描的数据付费。如果数据按分钟而不是小时分区,则查看一分钟数据的查询费用为成本的 1/60。如果我们使用仅指向最后一分钟数据的临时表,我们将节省不必要的费用。

高效创建 Parquet 数据

平均而言,我们有 800 个实例来处理流量。每个实例发送最终加载到 Amazon Redshift 中的事件。从三年前开始,我们可以将数据从每个服务器卸载到 S3 中,然后再执行从 S3 到 Amazon Redshift 的定期复制命令。


最近,Amazon Kinesis Firehose 将卸载数据的功能直接添加到了 Amazon Redshift 中。虽然现在这是个可行选项,但我们保留了相同的收集过程,这个过程已完美、高效地工作了三年。


但当我们合并 Redshift Spectrum 时,情况发生了变化。利用 Redshift Spectrum,我们需要找到一种方法来:


  • 从实例中收集事件数据。

  • 以 Parquet 格式保存数据。

  • 对数据进行有效分区。


为完成此操作,我们将数据另存为 CSV 格式,然后将其转换为 Parquet。生成 Parquet 文件的最有效方法是:


  1. 将 S3 临时存储桶用作目标,以一分钟的间隔将数据从实例发送到 Kinesis Firehose 中。

  2. 聚合每小时的数据,并使用 AWS LambdaAWS Glue 将其转换为 Parquet。

  3. 通过更新表分区将 Parquet 数据添加到 S3。


在此新过程中,我们必须在将数据发送到 Kinesis Firehose 之前更加注意验证数据,因为分区中的单个损坏记录无法对该分区进行查询。

数据验证

为了将我们的点击数据存储在表中,我们考虑了以下 SQL 创建表命令:


SQL


create external TABLE spectrum.blog_clicks (    user_id varchar(50),    campaign_id varchar(50),    os varchar(50),    ua varchar(255),    ts bigint,    billing float)按 (date date, hour smallint) 分区  存储为 parquet位置 's3://nuviad-temp/blog/clicks/';
复制代码


上面的语句定义了包括几个属性的新外部表(所有的 Redshift Spectrum 表为外部表)。我们将 ‘ts’ 存储为 Unix 时间戳,并非时间戳,计费数据存储为浮点数,而不是小数(稍后进行更多讨论)。我们还说过,数据按日期和小时进行分区,然后在 S3 上存储为 Parquet。


首先,我们需要获取表定义。此操作可以通过运行以下查询来实现:


SQL


SELECT   * FROM   svv_external_columns WHERE   tablename = 'blog_clicks';
复制代码


此查询列出表中的所有列及其各自的定义:


col 1col 2col 3col 4col 5col 6
schemanametablenamecolumnnameexternal_typecolumnnumpart_key
spectrumblog_clicksuser_idvarchar(50)10
spectrumblog_clickscampaign_idvarchar(50)20
spectrumblog_clicksosvarchar(50)30
spectrumblog_clicksuavarchar(255)40
spectrumblog_clickstsbigint50
spectrumblog_clicksbillingdouble60
spectrumblog_clicksdatedate71
spectrumblog_clickshoursmallint82


现在,我们可以使用此数据为我们的数据创建验证模式:


Json


const rtb_request_schema = {    "name": "clicks",    "items": {        "user_id": {            "type": "string",            "max_length": 100        },        "campaign_id": {            "type": "string",            "max_length": 50        },        "os": {            "type": "string",            "max_length": 50                    },        "ua": {            "type": "string",            "max_length": 255                    },        "ts": {            "type": "integer",            "min_value": 0,            "max_value": 9999999999999        },        "billing": {            "type": "float",            "min_value": 0,            "max_value": 9999999999999        }    }};
复制代码


接下来,我们创建一个函数,以使用此模式验证数据:


JavaScript


function valueIsValid(value, item_schema) {    if (schema.type == 'string') {        return (typeof value == 'string' && value.length <= schema.max_length);    }    else if (schema.type == 'integer') {        return (typeof value == 'number' && value >= schema.min_value && value <= schema.max_value);    }    else if (schema.type == 'float' || schema.type == 'double') {        return (typeof value == 'number' && value >= schema.min_value && value <= schema.max_value);    }    else if (schema.type == 'boolean') {        return typeof value == 'boolean';    }    else if (schema.type == 'timestamp') {        return (new Date(value)).getTime() > 0;    }    else {        return true;    }}
复制代码

使用 Kinesis Firehose 进行近实时的数据加载

在 Kinesis Firehose 上,我们创建了一个新的传输流以按如下所示处理事件:


传输流名称:事件来源:Direct PUTS3 存储桶:nuviad-eventsS3 前缀:rtb/IAM 角色:firehose_delivery_role_1数据转换:已禁用源记录备份:已禁用S3 缓冲区大小 (MB):100S3 缓冲区间隔(秒):60S3 压缩:GZIPS3 加密:未加密状态:活动错误记录:已启用
复制代码


此传输流每分钟聚合一次事件数据,或最多聚合 100 MB,并将数据作为 CSV/GZIP 压缩文件写入 S3 存储桶中。接下来,在我们验证数据后,我们可以将其安全发送至我们的 Kinesis Firehose API:


JavaScript


if (validated) {    let itemString = item.join('|')+'\n'; //Sending csv delimited by pipe and adding new line
let params = { DeliveryStreamName: 'events', Record: { Data: itemString } };
firehose.putRecord(params, function(err, data) { if (err) { console.error(err, err.stack); } else { // 继续您的下一步 } });}
复制代码


现在,我们有一个 CSV 文件,该文件表示 S3 中存储的一分钟事件数据。在向 S3 写入对象之前,Kinesis Firehose 通过添加一个格式为 YYYY/MM/DD/HH 的 UTC 时间前缀对文件进行自动命名。由于我们将日期和小时用作分区,我们需要更换文件的名称和位置,以适应我们的 Redshift Spectrum 模式。

使用 AWS Lambda 自动化数据分布

我们创建了一个由 S3 put 事件触发的简单 Lambda 函数,该事件将文件复制到另一个位置(或多个位置),同时对文件进行重新命名以适应我们的数据结构和处理流程。如前所述,Kinesis Firehose 生成的文件按照预先定义的层次构造结构,如:


S3://your-bucket/your-prefix/2017/08/01/20/events-4-2017-08-01-20-06-06-536f5c40-6893-4ee4-907d-81e4d3b09455.gz
复制代码


我们需要做的就是解析对象名称,并按照我们认为合适的方式重新构造其结构。在我们的例子中,我们执行了以下操作(事件是 Lambda 函数中接收的对象,该对象的所有数据都写入了 S3):


JavaScript


/*  事件对象中的对象密钥结构:your-prefix/2017/08/01/20/event-4-2017-08-01-20-06-06-536f5c40-6893-4ee4-907d-81e4d3b09455.gz  */
let key_parts = event.Records[0].s3.object.key.split('/');
let event_type = key_parts[0];let date = key_parts[1] + '-' + key_parts[2] + '-' + key_parts[3];let hour = key_parts[4];if (hour.indexOf('0') == 0) { hour = parseInt(hour, 10) + '';}
let parts1 = key_parts[5].split('-');let minute = parts1[7];if (minute.indexOf('0') == 0) { minute = parseInt(minute, 10) + '';}
复制代码


现在,我们可以将文件重新分布到我们需要的两个目标——一个用于分钟处理任务,另一个用于小时聚合:


JavaScript


copyObjectToHourlyFolder(event, date, hour, minute)        .then(copyObjectToMinuteFolder.bind(null, event, date, hour, minute))        .then(addPartitionToSpectrum.bind(null, event, date, hour, minute))        .then(deleteOldMinuteObjects.bind(null, event))        .then(deleteStreamObject.bind(null, event))                .then(result => {            callback(null, { message: 'done' });                    })        .catch(err => {            console.error(err);            callback(null, { message: err });                    });
复制代码


Kinesis Firehose 将数据存储在临时文件夹中。我们将对象复制到保存最后一分钟处理数据的另一个文件夹。该文件夹连接到一个小的 Redshift Spectrum 表,数据在该表中进行处理,不需要扫描更大的数据集。我们还将数据复制到保存一整个小时数据的文件夹中,稍后再进行聚合并转换为 Parquet 格式。


由于我们按日期和小时对数据进行分区,如果处理的分钟是一小时中的第一分钟(即分钟 0),我们在 Redshift Spectrum 表上创建了一个新分区。我们运行了以下各项:


SQL


ALTER TABLE   spectrum.events ADD 分区  (date='2017-08-01', hour=0)   LOCATION 's3://nuviad-temp/events/2017-08-01/0/';
复制代码


处理过数据并将其添加到表中之后,我们从 Kinesis Firehose 临时存储和分钟存储文件夹中删除处理后的数据。

使用 AWS Glue 和 Amazon EMR 将 CSV 迁移到 Parquet

我们发现 CSV 数据至 Parquet 转换的小时作业的最简单运行方法是使用 Lambda 和 AWS Glue(感谢强大的 AWS 大数据团队在这方面的帮助)。

创建 AWS Glue 作业

此简单 AWS Glue 脚本执行以下操作:


  • 获取待处理的作业、日期和小时参数

  • 创建 Spark EMR 上下文,以便我们运行 Spark 代码

  • 将 CSV 数据读取到 DataFrame 中

  • 将数据以 Parquet 格式写入目标 S3 存储桶

  • 为该表添加或修改 Redshift Spectrum / Amazon Athena 表分区


Python


import sysimport sysfrom awsglue.transforms import *from awsglue.utils import getResolvedOptionsfrom pyspark.context import SparkContextfrom awsglue.context import GlueContextfrom awsglue.job import Jobimport boto3
## @params: [JOB_NAME]args = getResolvedOptions(sys.argv, ['JOB_NAME','day_partition_key', 'hour_partition_key', 'day_partition_value', 'hour_partition_value' ])
#day_partition_key = "partition_0"#hour_partition_key = "partition_1"#day_partition_value = "2017-08-01"#hour_partition_value = "0"
day_partition_key = args['day_partition_key']hour_partition_key = args['hour_partition_key']day_partition_value = args['day_partition_value']hour_partition_value = args['hour_partition_value']
print("Running for " + day_partition_value + "/" + hour_partition_value)
sc = SparkContext()glueContext = GlueContext(sc)spark = glueContext.spark_sessionjob = Job(glueContext)job.init(args['JOB_NAME'], args)
df = spark.read.option("delimiter","|").csv("s3://nuviad-temp/events/"+day_partition_value+"/"+hour_partition_value)df.registerTempTable("data")
df1 = spark.sql("select _c0 as user_id, _c1 as campaign_id, _c2 as os, _c3 as ua, cast(_c4 as bigint) as ts, cast(_c5 as double) as billing from data")
df1.repartition(1).write.mode("overwrite").parquet("s3://nuviad-temp/parquet/"+day_partition_value+"/hour="+hour_partition_value)
client = boto3.client('athena', region_name='us-east-1')
response = client.start_query_execution( QueryString='alter table parquet_events add if not exists partition(' + day_partition_key + '=\'' + day_partition_value + '\',' + hour_partition_key + '=' + hour_partition_value + ') location \'s3://nuviad-temp/parquet/' + day_partition_value + '/hour=' + hour_partition_value + '\'' , QueryExecutionContext={ 'Database': 'spectrumdb' }, ResultConfiguration={ 'OutputLocation': 's3://nuviad-temp/convertresults' })
response = client.start_query_execution( QueryString='alter table parquet_events partition(' + day_partition_key + '=\'' + day_partition_value + '\',' + hour_partition_key + '=' + hour_partition_value + ') set location \'s3://nuviad-temp/parquet/' + day_partition_value + '/hour=' + hour_partition_value + '\'' , QueryExecutionContext={ 'Database': 'spectrumdb' }, ResultConfiguration={ 'OutputLocation': 's3://nuviad-temp/convertresults' })
job.commit()
复制代码


注:由于 Redshift Spectrum 和 Athena 都使用 AWS Glue 数据目录,我们可以使用 Athena 客户端将分区添加到表中。


下面是关于浮点类型、小数类型和双精度型的一些单词。经证明,使用小数类型比我们预期的更具有挑战性,因为 Redshift Spectrum 和 Spark 似乎以不同的方式使用它们。当我们在 Redshift Spectrum 和 Spark 中使用小数类型时,我们不断的收到错误,例如:


S3 查询异常(获取)。由于内部错误,任务失败。文件 'https://s3-external-1.amazonaws.com/nuviad-temp/events/2017-08-01/hour=2/part-00017-48ae5b6b-906e-4875-8cde-bc36c0c6d0ca.c000.snappy.parquet has an incompatible Parquet schema for column ‘s3://nuviad-events/events.lat’.列类型:DECIMAL(18, 8), Parquet schema:\noptional float lat [i:4 d:1 r:0]\n (https://s3-external-1.amazonaws.com/nuviad-temp/events/2017-08-01/hour=2/part-00017-48ae5b6b-906e-4875-8cde-bc36c0c6d0ca.c000.snappy.parq


我们必须对几个浮点格式进行试验,直到我们发现唯一有效的组合是在 Spark 代码中将列定义为双精度型在 Spectrum 中将其定义为浮点类型。这是账单在 Spectrum 中被定义为浮点类型在 Spark 代码中被定义为双精度型的原因。

创建一个 Lambda 函数以触发转换

接下来,我们创建了一个简单的 Lambda 函数,以使用简单的 Python 代码每小时触发 AWS Glue 脚本:


Python


import boto3import jsonfrom datetime import datetime, timedelta
client = boto3.client('glue')
def lambda_handler(event, context): last_hour_date_time = datetime.now() - timedelta(hours = 1) day_partition_value = last_hour_date_time.strftime("%Y-%m-%d") hour_partition_value = last_hour_date_time.strftime("%-H") response = client.start_job_run( JobName='convertEventsParquetHourly', Arguments={ '--day_partition_key': 'date', '--hour_partition_key': 'hour', '--day_partition_value': day_partition_value, '--hour_partition_value': hour_partition_value } )
复制代码


使用 Amazon CloudWatch Events,我们可以按小时触发此函数。此函数将触发名为 ‘convertEventsParquetHourly’ 的 AWS Glue 作业,并运行前一小时的作业,从而将要处理的作业名称和分区值传递到 AWS Glue 中。


本文转载自 AWS 技术博客。


原文链接:https://amazonaws-china.com/cn/blogs/china/big-data-using-amazon-redshift-spectrum-amazon-athena-and-aws-glue-with-node-js-in-production/


2020-01-13 14:53736

评论

发布
暂无评论
发现更多内容

挑战与机遇并存,华为云跨境电商新手入局必读指南来了

YG科技

小程序游戏风口有点“堵”?华为云耀云服务器L实例为企业疏难解困

YG科技

mac外接显示屏,下方程序坞跑到外接显示器上,怎么拖回mac里呢?

Rose

Mac

第27期 | GPTSecurity周报

云起无垠

物理机服务器优势

Geek_f19a80

服务器

3D渲染和动画制作:KeyShot 2023 Pro最新注册机

Rose

3D渲染 动画制作 KeyShot Pro破解版 KeyShot Pro 2023下载 KeyShot Pro注册机

诚意满满 颠覆认知丨华为云这款轻量应用服务总能带来惊喜

平平无奇爱好科技

深谙数字化奥义,华为云耀云服务器L实例助企业行稳致远.

平平无奇爱好科技

MathWorks Matlab R2023b Mac报错 License Manager Error -8

Rose

数学软件 Matlab R2023b破解补丁 Matlab R2023b报错

面向线上的springboot开发框架-Aradin

liudaac

springboot SpringCloud Alibaba spring-cloud java框架 脚手架

轻量应用服务器看花眼?华为云耀云服务器L实例“闭眼”选准没错

轶天下事

推荐15个苹果Mac用户都说好用的应用软件

Rose

Mac软件 苹果电脑 装机必备

CodeWhisperer 使用经验分享

亚马逊云科技 (Amazon Web Services)

人工智能 机器学习 云上探索实验室 Amazon CodeWhisperer

轻量应用服务器购买清单,双11看华为云这篇文章就够了

轶天下事

云上应用技术架构-LNMP应用

深蓝

Navicat Premium 15 for Mac(数据库开发工具)v15.0.36中文激活版

mac

Navicat Premium 苹果mac Windows软件 数据库管理软件

快get2023年跨境电商出海指南,华为云教你把握快速捞金机遇

YG科技

推翻企业ERP管理“三座大山”,华为云耀云服务器L实例言出必行

平平无奇爱好科技

当“996”开发模式都没用了,华为云这款轻量应用服务器才是终极答案

YG科技

Programming Abstractions in C阅读笔记:p196

codists

职场“内卷”利器?华为云这款轻量应用服务器助力开发者效率翻倍

平平无奇爱好科技

浪潮云蝉联中国云原生安全市场第一位

云安全

数字化专精特新战略,华为云耀云服务器L实例全面赋能企业

平平无奇爱好科技

mac电脑OCR文字识别推荐 Cisdem OCRWizard激活最新

mac大玩家j

Mac软件 ocr识别工具

如何快速配置一个新的SpringBoot项目

Lahm Chen

Spring Frame

基于 Letterize.js + Anime.js 实现炫酷文本特效

南城FE

CSS JavaScript 前端开发 动画 文字动画

Mac电脑批量重命名:A Better Finder Rename 12 免激活

胖墩儿不胖y

Mac软件 重命名工具 文件夹重命名软件

Linux常用命令用法及实现方式

小齐写代码

小程序开发还犯迷糊?快戳这份华为云实用技巧效率翻倍

YG科技

在生产中结合使用 Amazon Redshift Spectrum、Amazon Athena 和 AWS Glue 与 Node.js(三)_语言 & 开发_亚马逊云科技 (Amazon Web Services)_InfoQ精选文章