【ArchSummit架构师峰会】探讨数据与人工智能相互驱动的关系>>> 了解详情
写点什么

Amazon Redshift 助力 Equinox Fitness Clubs 完成客户旅程

  • 2019-09-26
  • 本文字数:6791 字

    阅读完需:约 22 分钟

Amazon Redshift 助力 Equinox Fitness Clubs 完成客户旅程

点击流分析工具可以很好地处理数据,有些工具甚至具有令人印象深刻的 BI 界面。但是,孤立地分析点击流数据存在很多限制。例如,客户对您网站上的产品或服务感兴趣,却在您的实体店购买。点击流分析师会问:“他们浏览产品之后发生了什么?”,而商业分析师会问:“他们购买之前发生了什么?”


点击流数据可以增强您的其他数据源,这并不足为奇。如果结合购买数据使用,它有助于您确定要放弃购买的产品或优化营销支出。同样,它还可以帮助您分析线上和线下行为,甚至是客户在注册帐户之前的行为。但是,一旦点击流数据馈送的好处显现,您必须快速适应新的请求。


这篇文章介绍了我们在 Equinox Fitness Clubs 如何将数据从 Amazon Redshift 迁移到 Amazon S3,以便对点击流数据使用后期绑定视图策略。我们将在这篇文章中广泛讨论 Apache Spark、Apache Parquet、数据湖、hive 分区和外部表等有趣的内容!


当我们开始将点击流数据从其自有工具传送到我们的 Amazon Redshift 数据仓库时,速度是主要考虑事项。我们的初始用例是将 Salesforce 数据与 Adobe Analytics 数据合并在一起,以便深入了解我们的潜在客户开发流程。Adobe Analytics 可以告诉我们潜在客户来自哪些渠道和宣传活动、访问期间浏览了哪些网页以及是否在我们的网站上提交了潜在客户表单。Salesforce 可以告诉我们潜在客户是否符合条件、是否咨询过顾问以及最终是否注册会员。将这两个数据集合并在一起有助于我们更好地理解和优化我们的营销策略。


首先,我们来了解将 Salesforce 和 Adobe Analytics 数据集中到 Amazon Redshift 所涉及的步骤。但即使在 Redshift 中合并在一起,也需要一个通用标识符才能进行交互。第一步是在我们的网站上提交潜在客户表单时,生成 GUID 并将相同的 GUID 发送到 Salesforce 和 Adobe Analytics。



接下来,我们要将 Salesforce 数据传送到 Redshift。幸运的是,这些馈送已存在,因此我们可以在馈送中添加新的 GUID 属性并在 Redshift 中对其进行描述。


同样,我们必须生成从 Adobe Analytics 到 Amazon Redshift 的数据馈送。Adobe Analytics 提供了 Amazon S3 作为我们数据的目标选项,因此我们将数据传送给 S3,然后创建作业将其发送到 Redshift。此作业涉及到获取每日 Adobe Analytics 馈送,并附有一个含有数百列和数十万行的数据文件、一系列查找文件(如数据标题)和一个描述已发送文件的清单文件 – 然后以其原始状态全部传送至 Amazon S3。接着,我们使用 Amazon EMR 和 Apache Spark 将数据馈送文件处理为单个 CSV 文件,然后将其保存到 S3,以便我们执行 COPY 命令将数据发送到 Amazon Redshift。


此作业持续了几周时间,在我们开始频繁使用数据之前运行良好。虽然作业效率很高,但新列数据发生了日期回溯的情况(模式演变)。因此我们确定需要更大的灵活性,这是由数据的性质决定的。

数据湖补救

当我们决定重构作业时,我们做了两项准备工作。首先,我们逐渐开始采用更多的数据湖策略。其次,近期发布Redshift Spectrum。它使我们能在数据湖中查询点击流数据的平面文件,而无需运行 COPY 命令并将其存储在 Redshift 中。此外,我们可以更有效地将点击流数据合并到存储在 Redshift 中的其他数据源。


我们想利用自描述数据,该数据将数据模式与数据本身相结合。将数据转换为自描述数据有助于我们管理广泛的点击流数据集,并防止模式演变相关的挑战。我们可以将所需的每一列都放入数据湖文件中,然后只使用查询中重要的列以加快处理。为实现这种灵活性,我们使用的是 Apache Parquet 文件格式,因为其列式存储技术,该格式不仅具有自描述性,而且速度很快。我们在 Amazon EMR 上使用 Apache Spark 将 CSV 转换为 Parquet 格式,并对数据分区以获区扫描性能,如以下代码所示。



from datetime import date, timedeltafrom pyspark.sql.types import *from pyspark.sql import SparkSessionimport jsonimport argparse # Usage# spark-submit all_omniture_to_parquet.py 2017-10-31 s3a:// eqxdl-prod-l-omniture eqxios eqxdl-prod eqeqxiosprod omniture_eqxios# python -m tasks.s2w_all_omniture_to_parquet 2017-10-31
parser = argparse.ArgumentParser()parser.add_argument('year_month_day_arg', help='Run date (yyyy-mm-dd)', type=str, default='XXX')parser.add_argument('s3_protocol', help='S3 protocol i.e. s3a://',type=str, default='XXX')parser.add_argument('source_bucket', help='Omniture source data bucket',type=str, default='XXX')parser.add_argument('source_path', help='Omniture source data path',type=str, default='XXX')parser.add_argument('target_bucket', help='Omniture target data bucket',type=str, default='XXX')parser.add_argument('report_suite', help='Omniture report suite ID',type=str, default='XXX')parser.add_argument('application', help='App name for job',type=str, default='XXX')args = parser.parse_args()
spark = SparkSession\ .builder\ .appName(args.application)\ .getOrCreate()
sc = spark.sparkContext
def manifest_toJSON(file, location): text = sc.textFile(file).collect() manifest = {'lookup_files': [], 'data_files': location, 'total_rows': 0} for x in text: if 'Lookup-File:' in x: manifest['lookup_files'].append(location+x.split(': ')[1]) elif 'Data-File: 01' in x: wildcard_path = x.replace('Data-File: 01','*') manifest['data_files'] += wildcard_path elif 'Record-Count:' in x: manifest['total_rows'] += int(x.split(': ')[1]) return manifest
# Create metadata by stitching together the file paths for# the header file and the data file from the manifest file# base_filepath = '/Users/rkelly/projects/sparkyTest/project_remodeling/ios_test_data/'base_filepath = '{}{}/{}/'.format(args.s3_protocol, args.source_bucket, args.source_path)manifest_filepath = base_filepath+'{}_{}.txt'.format(args.report_suite, args.year_month_day_arg)metadata = manifest_toJSON(manifest_filepath, base_filepath)
# Create a list of files and their data# Look specifically for the column_headers.tsv data# Split on \x00 to remove the garbage encoding and return a string of headerslookup_files = sc.textFile(','.join(metadata['lookup_files'])).collect()encoded_header = lookup_files[[idx for idx, s in enumerate(lookup_files) if 'column_headers.tsv' in s][0]].split('\x00')header = encoded_header[[idx for idx, s in enumerate(encoded_header) if '\t' in s][0]]\ .replace('\n', '')\ .replace('(', '')\ .replace(')', '')\ .replace(' ', '-')
# Create a schema for the list from the header file splitting on tabs# Cast everything as a string to avoid data type failuresschema = StructType([ StructField(field, StringType(), True) for field in header.split('\t')])
# Bypass RDD and write data file as a dataframe# then save as parquet to tie headers to their respective valuesdf = spark.read.csv(metadata['data_files'], header=False, schema=schema, sep='\t', nullValue=None)destination_filepath = '{}{}/{}/dt={}/'.format(args.s3_protocol, args.target_bucket, args.application, args.year_month_day_arg)df.write.mode('overwrite').parquet(destination_filepath)
# Gracefully exit out of spark and this filesc.stop()exit()
复制代码


借助 AWS Glue Data Catalog,我们可以在 Amazon Redshift 和其他查询工具(如 Amazon Athena 和 Apache Spark)中查询可用的点击流数据。这是通过将 Parquet 文件映射到关系模式来实现的。AWS Glue 允许在几秒钟内查询其他数据。这是因为模式会实时发生更改。这意味着您可以同时完成列删除/添加、列索引重新排序和列类型更改。然后,就可以在保存模式后立即查询数据。此外,Parquet 格式可防止数据形状发生变化时出现故障,或者放弃和删除数据集中的某些列。


我们使用以下查询为 Adobe Analytics 网站数据创建了第一个 AWS Glue 表。我们在 SQL Workbench 的 Amazon Redshift 中运行了此查询。


--First create your schemacreate external schema omniture_prodfrom data catalog database 'omniture' iam_role 'arn:aws:iam:::role
--Then create your “table” CREATE EXTERNAL TABLE omniture_prod.eqx_web ( date_time VARCHAR, va_closer_id VARCHAR, va_closer_detail VARCHAR, va_finder_detail VARCHAR, va_finder_id VARCHAR, ip VARCHAR, domain VARCHAR, post_evar1 VARCHAR)STORED AS PARQUETLOCATION 's3://eqxdl-prod/omniture/eqx_web/'table properties ('parquet.compress'='SNAPPY');
--Check your databases, schemas, and tablesselect * from pg_catalog.svv_external_databases;select * from pg_catalog.svv_external_schemas;select * from pg_catalog.svv_external_tables;
复制代码


运行此查询后,我们通过 AWS Glue 界面根据请求对 schema 添加了其他列。我们还使用分区来加快查询和降低成本。



此时,我们的数据库中有个新的 schema 文件夹。它包含可以查询的外部表,但我们希望更进一步。我们需要增加一些数据转换,例如:


  • 将 ID 重命名为字符串

  • 连接值

  • 操作字符串,不包括我们从 AWS 发送的用来测试网站的 bot 流量

  • 更改列名称以方便使用。

  • 为此,我们创建了如下所示的外部表视图:


create view edw_t.f_omniture_web as select    REPLACE(dt, '-', '') as hive_date_key,    va_closer_id,    va_closer_detail as last_touch_campaign,    CASE        WHEN (va_closer_id) = '1' THEN 'Paid Search'        WHEN (va_closer_id) = '2' THEN 'Natural Search'        WHEN (va_closer_id) = '3' THEN 'Display'        WHEN (va_closer_id) = '4' THEN 'Email Acq'        WHEN (va_closer_id) = '5' THEN 'Direct'        WHEN (va_closer_id) = '6' THEN 'Session Refresh'        WHEN (va_closer_id) = '7' THEN 'Social Media'        WHEN (va_closer_id) = '8' THEN 'Referring Domains'        WHEN (va_closer_id) = '9' THEN 'Email Memb'        WHEN (va_closer_id) = '10' THEN 'Social Placement'        WHEN (va_closer_id) = '11' THEN 'Other Placement'        WHEN (va_closer_id) = '12' THEN 'Partnership'        WHEN (va_closer_id) = '13' THEN 'Other Eqx Sites'        WHEN (va_closer_id) = '14' THEN 'Influencers'        ELSE NULL    END AS last_touch_channel,    va_finder_detail as first_touch_campaign,    va_finder_id as va_finder_id,    CASE        WHEN (va_finder_id) = '1' THEN 'Paid Search'        WHEN (va_finder_id) = '2' THEN 'Natural Search'        WHEN (va_finder_id) = '3' THEN 'Display'        WHEN (va_finder_id) = '4' THEN 'Email Acq'        WHEN (va_finder_id) = '5' THEN 'Direct'        WHEN (va_finder_id) = '6' THEN 'Session Refresh'        WHEN (va_finder_id) = '7' THEN 'Social Media'        WHEN (va_finder_id) = '8' THEN 'Referring Domains'        WHEN (va_finder_id) = '9' THEN 'Email Memb'        WHEN (va_finder_id) = '10' THEN 'Social Placement'        WHEN (va_finder_id) = '11' THEN 'Other Placement'        WHEN (va_finder_id) = '12' THEN 'Partnership'        WHEN (va_finder_id) = '13' THEN 'Other Eqx Sites'        WHEN (va_closer_id) = '14' THEN 'Influencers'        ELSE NULL    END AS first_touch_channel,    ip as ip_address,    domain as domain,    post_evar1 AS internal_compaign,    post_evar10 as site_subsection_nm,    post_evar11 as IOS_app_view_txt,    post_evar12 AS site_section_nm,    post_evar15 AS transaction_id,    post_evar23 as join_barcode_id,    post_evar3 AS page_nm,    post_evar32 as host_nm,    post_evar41 as class_category_id,    post_evar42 as class_id,    post_evar43 as class_instance_id,    post_evar60 AS referral_source_txt,    post_evar69 as adwords_gclid,    post_evar7 as usersec_tracking_id,    post_evar8 as facility_id,    post_event_list as post_event_list,     post_visid_low||post_visid_high as unique_adobe_id,    post_visid_type as post_visid_type,    post_page_event as hit_type,    visit_num as visit_number,    visit_start_time_gmt,    post_evar25 as login_status,    exclude_hit as exclude_hit,    hit_source as hit_source,    geo_zip,    geo_city,    geo_region,    geo_country,    post_evar64 as api_error_msg,    post_evar70 as page_load_time,    post_evar78 as join_transaction_id,    post_evar9 as page_url,    visit_start_pagename as entry_pg,    post_tnt as abtest_campaign,    post_tnt_action as abtest_experience,    user_agent as user_agent,    mobile_id as mobile_id,    cast(date_time as timestamp) as date_time,    CONVERT_TIMEZONE(        'America/New_York', -- timezone of origin        (cast(            case             when post_t_time_info like '%undefined%' then '0'            when post_t_time_info is null then '0'            when post_t_time_info = '' then '0'            when cast(split_part(post_t_time_info,' ',4) as int) < 0              then left(split_part(post_t_time_info,' ',4),4)            else left(split_part(post_t_time_info,' ',4),3) end as int        )/60),        cast(date_time as timestamp)    ) as date_time_local,    post_t_time_info as local_timezonefrom omniture_prod.eqx_webwhere exclude_hit = '0'and hit_source not in ('5','7','8','9')
and domain <> 'amazonaws.com'and domain <> 'amazon.com'
WITH NO SCHEMA BINDING;

复制代码


现在,我们可以从 Amazon Redshift 执行查询,将我们的结构化 Salesforce 数据与半结构化动态 Adobe Analytics 数据相结合。通过这些更改,我们的数据变得极其灵活、对存储大小非常友好、查询特别高效。从那时起,我们开始将 Redshift Spectrum 用于很多用例,包括数据质量检查、机器数据、历史数据存档,使我们的数据分析师和科学家能够更轻松地混合和加载数据。


with web_leads as (  select transaction_id,last_touch_channel  from edw_t.f_omniture_web  where hive_date_key = '20170301'      and post_event_list like '%201%'      and transaction_id != '807f0cdc-80cf-42d3-8d75-e55e277a8718'),opp_lifecycle as (  SELECT lifecycle,weblead_transactionid  FROM edw_t.f_opportunity  where weblead_transactionid is not null  and created_date_key between '20170220'and '20170310')select  web_leads.transaction_id,  coalesce(opp_lifecycle.lifecycle, 'N/A') as Lifecyclefrom web_leadsleft join opp_lifecycle on web_leads.transaction_id = opp_lifecycle.weblead_transactionid

复制代码

小结

通过将 Amazon S3 数据湖与 Amazon Redshift 合并,我们能为点击流数据构建高效灵活的分析平台。从而无需始终将点击流数据加载到数据仓库中,并且使平台适应传入数据中 schema 更改。请阅读 Redshift Spectrum 入门文档,还可以在下面观看我们在 AWS Chicago Summit 2018 上的演讲。


相关文章:


如果觉得这篇文章有用,请务必阅读从数据湖到数据仓库:利用 Amazon Redshift Spectrum 增强 Customer 360 和 Narrativ 通过 Amazon Redshift 帮助创建者将其数字内容货币化。


作者介绍:


Ryan Kelly 是 Equinox 的数据架构师,帮助草拟和实施数据计划框架。他还负责点击流跟踪,帮助团队深入了解他们的数字计划。Ryan 热衷于让人们更方便地访问和提取他们的数据,以获取商业智能、执行分析和丰富产品/服务。他还喜欢探索和审查新技术,了解是如何改善他们在 Equinox 的工作。


本文转载自 AWS 技术博客。


原文链接:


https://amazonaws-china.com/cn/blogs/china/closing-the-customer-journey-loop-with-amazon-redshift-at-equinox-fitness-clubs/


2019-09-26 10:42519
用户头像

发布了 1835 篇内容, 共 91.7 次阅读, 收获喜欢 73 次。

关注

评论

发布
暂无评论
发现更多内容

数仓分层架构如何设计?

奔向架构师

数据库 数据仓库 数据架构

通用时区:你应该知道的数据库时区知识

华为云开发者联盟

数据库 时区 GaussDB(DWS) 通用时区 夏令时

自从有了这个工具,一键代码迁移不在话下

华为云开发者联盟

代码迁移 鲲鹏DevKit 汇编翻译 汇编语言 Kunpeng

2021年,想要成为年薪百万的Java架构师需要掌握哪些技术?

Java架构师迁哥

工作年限、成长路线、进阶技术。怎样才能成为架构师?

Linux服务器开发

Linux服务器开发 Linux后台开发 软件架构师 服务器架构师 C++架构师

【Linux】使用 systemd 管理 frp 服务

赖猫

Linux 后端

问题定位 | XtraBackup 8.0 数据重建避坑事件始末

RadonDB

MySQL Xenon XtraBackup

低代码助力企业生产管理8大招式,你学废(hui)了吗?

优秀

低代码

架构实战营 模块六:课后作业

👈

架构实战营

一份283页pdf,五大核心内容,熬夜“啃完”,竟拿下了阿里offer

Java 程序员 架构 面试

【签约计划】百位签约创作者名单公布

InfoQ写作社区官方

签约计划

为什么大家都在用WebRTC?

anyRTC开发者

音视频 WebRTC 语音通话 视频通讯

深度 | 字节跳动微服务架构体系演进

字节跳动 微服务 云原生 Service Mesh 服务网格 火山引擎

【小技巧】Google浏览器设置之Tab折叠分组

恒生LIGHT云社区

推荐 浏览器书签 谷歌 工具分享

GitHub 近两万 Star,无需编码,可一键生成前后端代码,这个开源项目有点强!

程序员生活志

开发5年!三面字节,成功拿到27k*17offer,原来也没那么难

Java 程序员 架构 面试

网络为本,博睿数据NPMD用20%的投入实现80%的功能

博睿数据

博睿数据 数据链DNA NPMD

ROS CDK | 云上资源自动化部署新模式

郭旭东

阿里云 ROS 基础设施即代码 IaC

从零开始学习3D可视化之拾取

ThingJS数字孪生引擎

大前端 可视化 3D 3D可视化 数字孪生

Flink Job 概览

Alex🐒

flink 翻译 flink1.13

微警务系统搭建,智慧派出所平台建设解决方案

墨奇,以“一手之力” 证明你就是你

E科讯

剪视频一点都不难,多款超实用剪辑软件全方位评测!

懒得勤快

短视频 视频剪辑 视频制作

架构实战营 模块六:学习总结

👈

架构实战营

体验为先,博睿数据打造以用户会话为中心的监测体系

博睿数据

博睿数据 数据链DNA DEM

双指针法

后台服务器开发

c++ 双指针 LeetCode

新版发布|ShardingSphere 5.0.0-beta 来了!

SphereEx

ShardingSphere

高寿命NVMe SSD应用场景探讨

怀瑾握瑜

区块链 数据库 云计算 SSD 虚拟货币

HTAP | MySQL 到 ClickHouse 的高速公路

RadonDB

MySQL Clickhouse Xenon

字节跳动亿级视频处理系统高可用架构实践

火山引擎开发者社区

架构 后端 音视频

Rust从0到1-泛型-生命周期

rust 泛型 生命周期 generic lifetimes

Amazon Redshift 助力 Equinox Fitness Clubs 完成客户旅程_语言 & 开发_亚马逊云科技 (Amazon Web Services)_InfoQ精选文章