PCon全球产品创新大会开幕在即,查看大会全部精彩内容这里直达 了解详情
写点什么

Amazon Redshift 助力 Equinox Fitness Clubs 完成客户旅程

  • 2019 年 9 月 26 日
  • 本文字数:6791 字

    阅读完需:约 22 分钟

Amazon Redshift 助力 Equinox Fitness Clubs 完成客户旅程

点击流分析工具可以很好地处理数据,有些工具甚至具有令人印象深刻的 BI 界面。但是,孤立地分析点击流数据存在很多限制。例如,客户对您网站上的产品或服务感兴趣,却在您的实体店购买。点击流分析师会问:“他们浏览产品之后发生了什么?”,而商业分析师会问:“他们购买之前发生了什么?”


点击流数据可以增强您的其他数据源,这并不足为奇。如果结合购买数据使用,它有助于您确定要放弃购买的产品或优化营销支出。同样,它还可以帮助您分析线上和线下行为,甚至是客户在注册帐户之前的行为。但是,一旦点击流数据馈送的好处显现,您必须快速适应新的请求。


这篇文章介绍了我们在 Equinox Fitness Clubs 如何将数据从 Amazon Redshift 迁移到 Amazon S3,以便对点击流数据使用后期绑定视图策略。我们将在这篇文章中广泛讨论 Apache Spark、Apache Parquet、数据湖、hive 分区和外部表等有趣的内容!


当我们开始将点击流数据从其自有工具传送到我们的 Amazon Redshift 数据仓库时,速度是主要考虑事项。我们的初始用例是将 Salesforce 数据与 Adobe Analytics 数据合并在一起,以便深入了解我们的潜在客户开发流程。Adobe Analytics 可以告诉我们潜在客户来自哪些渠道和宣传活动、访问期间浏览了哪些网页以及是否在我们的网站上提交了潜在客户表单。Salesforce 可以告诉我们潜在客户是否符合条件、是否咨询过顾问以及最终是否注册会员。将这两个数据集合并在一起有助于我们更好地理解和优化我们的营销策略。


首先,我们来了解将 Salesforce 和 Adobe Analytics 数据集中到 Amazon Redshift 所涉及的步骤。但即使在 Redshift 中合并在一起,也需要一个通用标识符才能进行交互。第一步是在我们的网站上提交潜在客户表单时,生成 GUID 并将相同的 GUID 发送到 Salesforce 和 Adobe Analytics。



接下来,我们要将 Salesforce 数据传送到 Redshift。幸运的是,这些馈送已存在,因此我们可以在馈送中添加新的 GUID 属性并在 Redshift 中对其进行描述。


同样,我们必须生成从 Adobe Analytics 到 Amazon Redshift 的数据馈送。Adobe Analytics 提供了 Amazon S3 作为我们数据的目标选项,因此我们将数据传送给 S3,然后创建作业将其发送到 Redshift。此作业涉及到获取每日 Adobe Analytics 馈送,并附有一个含有数百列和数十万行的数据文件、一系列查找文件(如数据标题)和一个描述已发送文件的清单文件 – 然后以其原始状态全部传送至 Amazon S3。接着,我们使用 Amazon EMR 和 Apache Spark 将数据馈送文件处理为单个 CSV 文件,然后将其保存到 S3,以便我们执行 COPY 命令将数据发送到 Amazon Redshift。


此作业持续了几周时间,在我们开始频繁使用数据之前运行良好。虽然作业效率很高,但新列数据发生了日期回溯的情况(模式演变)。因此我们确定需要更大的灵活性,这是由数据的性质决定的。


数据湖补救

当我们决定重构作业时,我们做了两项准备工作。首先,我们逐渐开始采用更多的数据湖策略。其次,近期发布Redshift Spectrum。它使我们能在数据湖中查询点击流数据的平面文件,而无需运行 COPY 命令并将其存储在 Redshift 中。此外,我们可以更有效地将点击流数据合并到存储在 Redshift 中的其他数据源。


我们想利用自描述数据,该数据将数据模式与数据本身相结合。将数据转换为自描述数据有助于我们管理广泛的点击流数据集,并防止模式演变相关的挑战。我们可以将所需的每一列都放入数据湖文件中,然后只使用查询中重要的列以加快处理。为实现这种灵活性,我们使用的是 Apache Parquet 文件格式,因为其列式存储技术,该格式不仅具有自描述性,而且速度很快。我们在 Amazon EMR 上使用 Apache Spark 将 CSV 转换为 Parquet 格式,并对数据分区以获区扫描性能,如以下代码所示。



from datetime import date, timedeltafrom pyspark.sql.types import *from pyspark.sql import SparkSessionimport jsonimport argparse # Usage# spark-submit all_omniture_to_parquet.py 2017-10-31 s3a:// eqxdl-prod-l-omniture eqxios eqxdl-prod eqeqxiosprod omniture_eqxios# python -m tasks.s2w_all_omniture_to_parquet 2017-10-31
parser = argparse.ArgumentParser()parser.add_argument('year_month_day_arg', help='Run date (yyyy-mm-dd)', type=str, default='XXX')parser.add_argument('s3_protocol', help='S3 protocol i.e. s3a://',type=str, default='XXX')parser.add_argument('source_bucket', help='Omniture source data bucket',type=str, default='XXX')parser.add_argument('source_path', help='Omniture source data path',type=str, default='XXX')parser.add_argument('target_bucket', help='Omniture target data bucket',type=str, default='XXX')parser.add_argument('report_suite', help='Omniture report suite ID',type=str, default='XXX')parser.add_argument('application', help='App name for job',type=str, default='XXX')args = parser.parse_args()
spark = SparkSession\ .builder\ .appName(args.application)\ .getOrCreate()
sc = spark.sparkContext
def manifest_toJSON(file, location): text = sc.textFile(file).collect() manifest = {'lookup_files': [], 'data_files': location, 'total_rows': 0} for x in text: if 'Lookup-File:' in x: manifest['lookup_files'].append(location+x.split(': ')[1]) elif 'Data-File: 01' in x: wildcard_path = x.replace('Data-File: 01','*') manifest['data_files'] += wildcard_path elif 'Record-Count:' in x: manifest['total_rows'] += int(x.split(': ')[1]) return manifest
# Create metadata by stitching together the file paths for# the header file and the data file from the manifest file# base_filepath = '/Users/rkelly/projects/sparkyTest/project_remodeling/ios_test_data/'base_filepath = '{}{}/{}/'.format(args.s3_protocol, args.source_bucket, args.source_path)manifest_filepath = base_filepath+'{}_{}.txt'.format(args.report_suite, args.year_month_day_arg)metadata = manifest_toJSON(manifest_filepath, base_filepath)
# Create a list of files and their data# Look specifically for the column_headers.tsv data# Split on \x00 to remove the garbage encoding and return a string of headerslookup_files = sc.textFile(','.join(metadata['lookup_files'])).collect()encoded_header = lookup_files[[idx for idx, s in enumerate(lookup_files) if 'column_headers.tsv' in s][0]].split('\x00')header = encoded_header[[idx for idx, s in enumerate(encoded_header) if '\t' in s][0]]\ .replace('\n', '')\ .replace('(', '')\ .replace(')', '')\ .replace(' ', '-')
# Create a schema for the list from the header file splitting on tabs# Cast everything as a string to avoid data type failuresschema = StructType([ StructField(field, StringType(), True) for field in header.split('\t')])
# Bypass RDD and write data file as a dataframe# then save as parquet to tie headers to their respective valuesdf = spark.read.csv(metadata['data_files'], header=False, schema=schema, sep='\t', nullValue=None)destination_filepath = '{}{}/{}/dt={}/'.format(args.s3_protocol, args.target_bucket, args.application, args.year_month_day_arg)df.write.mode('overwrite').parquet(destination_filepath)
# Gracefully exit out of spark and this filesc.stop()exit()
复制代码


借助 AWS Glue Data Catalog,我们可以在 Amazon Redshift 和其他查询工具(如 Amazon Athena 和 Apache Spark)中查询可用的点击流数据。这是通过将 Parquet 文件映射到关系模式来实现的。AWS Glue 允许在几秒钟内查询其他数据。这是因为模式会实时发生更改。这意味着您可以同时完成列删除/添加、列索引重新排序和列类型更改。然后,就可以在保存模式后立即查询数据。此外,Parquet 格式可防止数据形状发生变化时出现故障,或者放弃和删除数据集中的某些列。


我们使用以下查询为 Adobe Analytics 网站数据创建了第一个 AWS Glue 表。我们在 SQL Workbench 的 Amazon Redshift 中运行了此查询。


--First create your schemacreate external schema omniture_prodfrom data catalog database 'omniture' iam_role 'arn:aws:iam:::role
--Then create your “table” CREATE EXTERNAL TABLE omniture_prod.eqx_web ( date_time VARCHAR, va_closer_id VARCHAR, va_closer_detail VARCHAR, va_finder_detail VARCHAR, va_finder_id VARCHAR, ip VARCHAR, domain VARCHAR, post_evar1 VARCHAR)STORED AS PARQUETLOCATION 's3://eqxdl-prod/omniture/eqx_web/'table properties ('parquet.compress'='SNAPPY');
--Check your databases, schemas, and tablesselect * from pg_catalog.svv_external_databases;select * from pg_catalog.svv_external_schemas;select * from pg_catalog.svv_external_tables;
复制代码


运行此查询后,我们通过 AWS Glue 界面根据请求对 schema 添加了其他列。我们还使用分区来加快查询和降低成本。



此时,我们的数据库中有个新的 schema 文件夹。它包含可以查询的外部表,但我们希望更进一步。我们需要增加一些数据转换,例如:


  • 将 ID 重命名为字符串

  • 连接值

  • 操作字符串,不包括我们从 AWS 发送的用来测试网站的 bot 流量

  • 更改列名称以方便使用。

  • 为此,我们创建了如下所示的外部表视图:


create view edw_t.f_omniture_web as select    REPLACE(dt, '-', '') as hive_date_key,    va_closer_id,    va_closer_detail as last_touch_campaign,    CASE        WHEN (va_closer_id) = '1' THEN 'Paid Search'        WHEN (va_closer_id) = '2' THEN 'Natural Search'        WHEN (va_closer_id) = '3' THEN 'Display'        WHEN (va_closer_id) = '4' THEN 'Email Acq'        WHEN (va_closer_id) = '5' THEN 'Direct'        WHEN (va_closer_id) = '6' THEN 'Session Refresh'        WHEN (va_closer_id) = '7' THEN 'Social Media'        WHEN (va_closer_id) = '8' THEN 'Referring Domains'        WHEN (va_closer_id) = '9' THEN 'Email Memb'        WHEN (va_closer_id) = '10' THEN 'Social Placement'        WHEN (va_closer_id) = '11' THEN 'Other Placement'        WHEN (va_closer_id) = '12' THEN 'Partnership'        WHEN (va_closer_id) = '13' THEN 'Other Eqx Sites'        WHEN (va_closer_id) = '14' THEN 'Influencers'        ELSE NULL    END AS last_touch_channel,    va_finder_detail as first_touch_campaign,    va_finder_id as va_finder_id,    CASE        WHEN (va_finder_id) = '1' THEN 'Paid Search'        WHEN (va_finder_id) = '2' THEN 'Natural Search'        WHEN (va_finder_id) = '3' THEN 'Display'        WHEN (va_finder_id) = '4' THEN 'Email Acq'        WHEN (va_finder_id) = '5' THEN 'Direct'        WHEN (va_finder_id) = '6' THEN 'Session Refresh'        WHEN (va_finder_id) = '7' THEN 'Social Media'        WHEN (va_finder_id) = '8' THEN 'Referring Domains'        WHEN (va_finder_id) = '9' THEN 'Email Memb'        WHEN (va_finder_id) = '10' THEN 'Social Placement'        WHEN (va_finder_id) = '11' THEN 'Other Placement'        WHEN (va_finder_id) = '12' THEN 'Partnership'        WHEN (va_finder_id) = '13' THEN 'Other Eqx Sites'        WHEN (va_closer_id) = '14' THEN 'Influencers'        ELSE NULL    END AS first_touch_channel,    ip as ip_address,    domain as domain,    post_evar1 AS internal_compaign,    post_evar10 as site_subsection_nm,    post_evar11 as IOS_app_view_txt,    post_evar12 AS site_section_nm,    post_evar15 AS transaction_id,    post_evar23 as join_barcode_id,    post_evar3 AS page_nm,    post_evar32 as host_nm,    post_evar41 as class_category_id,    post_evar42 as class_id,    post_evar43 as class_instance_id,    post_evar60 AS referral_source_txt,    post_evar69 as adwords_gclid,    post_evar7 as usersec_tracking_id,    post_evar8 as facility_id,    post_event_list as post_event_list,     post_visid_low||post_visid_high as unique_adobe_id,    post_visid_type as post_visid_type,    post_page_event as hit_type,    visit_num as visit_number,    visit_start_time_gmt,    post_evar25 as login_status,    exclude_hit as exclude_hit,    hit_source as hit_source,    geo_zip,    geo_city,    geo_region,    geo_country,    post_evar64 as api_error_msg,    post_evar70 as page_load_time,    post_evar78 as join_transaction_id,    post_evar9 as page_url,    visit_start_pagename as entry_pg,    post_tnt as abtest_campaign,    post_tnt_action as abtest_experience,    user_agent as user_agent,    mobile_id as mobile_id,    cast(date_time as timestamp) as date_time,    CONVERT_TIMEZONE(        'America/New_York', -- timezone of origin        (cast(            case             when post_t_time_info like '%undefined%' then '0'            when post_t_time_info is null then '0'            when post_t_time_info = '' then '0'            when cast(split_part(post_t_time_info,' ',4) as int) < 0              then left(split_part(post_t_time_info,' ',4),4)            else left(split_part(post_t_time_info,' ',4),3) end as int        )/60),        cast(date_time as timestamp)    ) as date_time_local,    post_t_time_info as local_timezonefrom omniture_prod.eqx_webwhere exclude_hit = '0'and hit_source not in ('5','7','8','9')
and domain <> 'amazonaws.com'and domain <> 'amazon.com'
WITH NO SCHEMA BINDING;

复制代码


现在,我们可以从 Amazon Redshift 执行查询,将我们的结构化 Salesforce 数据与半结构化动态 Adobe Analytics 数据相结合。通过这些更改,我们的数据变得极其灵活、对存储大小非常友好、查询特别高效。从那时起,我们开始将 Redshift Spectrum 用于很多用例,包括数据质量检查、机器数据、历史数据存档,使我们的数据分析师和科学家能够更轻松地混合和加载数据。


with web_leads as (  select transaction_id,last_touch_channel  from edw_t.f_omniture_web  where hive_date_key = '20170301'      and post_event_list like '%201%'      and transaction_id != '807f0cdc-80cf-42d3-8d75-e55e277a8718'),opp_lifecycle as (  SELECT lifecycle,weblead_transactionid  FROM edw_t.f_opportunity  where weblead_transactionid is not null  and created_date_key between '20170220'and '20170310')select  web_leads.transaction_id,  coalesce(opp_lifecycle.lifecycle, 'N/A') as Lifecyclefrom web_leadsleft join opp_lifecycle on web_leads.transaction_id = opp_lifecycle.weblead_transactionid

复制代码


小结

通过将 Amazon S3 数据湖与 Amazon Redshift 合并,我们能为点击流数据构建高效灵活的分析平台。从而无需始终将点击流数据加载到数据仓库中,并且使平台适应传入数据中 schema 更改。请阅读 Redshift Spectrum 入门文档,还可以在下面观看我们在 AWS Chicago Summit 2018 上的演讲。


相关文章:


如果觉得这篇文章有用,请务必阅读从数据湖到数据仓库:利用 Amazon Redshift Spectrum 增强 Customer 360 和 Narrativ 通过 Amazon Redshift 帮助创建者将其数字内容货币化。


作者介绍:


Ryan Kelly 是 Equinox 的数据架构师,帮助草拟和实施数据计划框架。他还负责点击流跟踪,帮助团队深入了解他们的数字计划。Ryan 热衷于让人们更方便地访问和提取他们的数据,以获取商业智能、执行分析和丰富产品/服务。他还喜欢探索和审查新技术,了解是如何改善他们在 Equinox 的工作。


本文转载自 AWS 技术博客。


原文链接:


https://amazonaws-china.com/cn/blogs/china/closing-the-customer-journey-loop-with-amazon-redshift-at-equinox-fitness-clubs/


2019 年 9 月 26 日 10:42321
用户头像

发布了 1511 篇内容, 共 56.2 次阅读, 收获喜欢 61 次。

关注

评论

发布
暂无评论
发现更多内容

github搜索技巧小结,深入理解JVM

Java 程序员 后端

Java agent还不了解的程序员该反省一下了,腾讯大牛教你自己写Java框架

Java 程序员 后端

Elasticsearch聚合学习之二:区间聚合,java中高级面试题大全

Java 程序员 后端

Java 之父:找Bug最浪费时间,现在不是开源的黄金时代

Java 程序员 后端

HCIE云计算--灾备,万字总结

Java 后端

Java 低代码开发平台“光”发布 2,springboot的工作原理图

Java 后端

第 2 周作业

波波

「架构实战营」

Elasticsearch聚合学习之五:排序结果不准的问题分析

Java 程序员 后端

IDEA开发Spark应用实战(Scala),java高级开发简历

Java 程序员 后端

jackson学习之六:常用类注解,java编程思想第五版电子书

Java 程序员 后端

Java 之类与对象,java零基础自学视频百度云

Java 程序员 后端

Elasticsearch查询速度为什么这么快?看啥?问你呢

Java 程序员 后端

HashMap底层实现原理及面试问题,linux服务器搭建教程视频

Java 程序员 后端

Github已星标180K又一神作,阿里巴巴内部并发编程笔记

Java 程序员 后端

HashMap(jdk1,linux学习路线图

Java 程序员 后端

Flink on Yarn三部曲之一:准备工作,java开发校招面试题

Java 程序员 后端

Flink1,java从入门到精通第四版pdf下载

Java 程序员 后端

Google 面试六轮游,结果还是没过!Google面试真题分享

Java 程序员 后端

Github神作!2021Java秋招高级面试指南,吃透至少阿里P6

Java 程序员 后端

Git,GitHub与GitLab的区别,java框架开发面试题

Java 程序员 后端

内卷把同事逼成了“扫地僧”,把Git上所有面试题足足整理24W 字

Java spring 程序员 mybatis SpringCloud

flex 布局详解,我是如何收割多家大厂offer的

Java 程序员 后端

Java agent还不了解的程序员该反省一下了(1)

Java 程序员 后端

Hadoop分布式高可用HA集群搭建笔记(含Hive之构建)

Java 程序员 后端

HTML笔记 —— 表单,java数组的底层原理

Java 程序员 后端

HTTP 2,实战篇

Java 程序员 后端

Java 8 Lambda 表达式和 Stream 操作,网易资深Java架构师

Java 程序员 后端

Java 专项练习【1 - 10】,java常见算法面试题

Java 程序员 后端

Java 多线程 —— 同步代码块(1),狂神说docker进阶笔记

Java 程序员 后端

git(8)Git 与其他系统,高性能mysql第四版pdf百度云

Java 程序员 后端

git(9)Git 内部原理,nginx模块工作原理

Java 程序员 后端

IM即时消息系统在荔枝的落地实践

IM即时消息系统在荔枝的落地实践

Amazon Redshift 助力 Equinox Fitness Clubs 完成客户旅程_语言 & 开发_亚马逊云科技 (Amazon Web Services)_InfoQ精选文章