AI实践哪家强?来 AICon, 解锁技术前沿,探寻产业新机! 了解详情
写点什么

Amazon Redshift 助力 Equinox Fitness Clubs 完成客户旅程

  • 2019-09-26
  • 本文字数:6791 字

    阅读完需:约 22 分钟

Amazon Redshift 助力 Equinox Fitness Clubs 完成客户旅程

点击流分析工具可以很好地处理数据,有些工具甚至具有令人印象深刻的 BI 界面。但是,孤立地分析点击流数据存在很多限制。例如,客户对您网站上的产品或服务感兴趣,却在您的实体店购买。点击流分析师会问:“他们浏览产品之后发生了什么?”,而商业分析师会问:“他们购买之前发生了什么?”


点击流数据可以增强您的其他数据源,这并不足为奇。如果结合购买数据使用,它有助于您确定要放弃购买的产品或优化营销支出。同样,它还可以帮助您分析线上和线下行为,甚至是客户在注册帐户之前的行为。但是,一旦点击流数据馈送的好处显现,您必须快速适应新的请求。


这篇文章介绍了我们在 Equinox Fitness Clubs 如何将数据从 Amazon Redshift 迁移到 Amazon S3,以便对点击流数据使用后期绑定视图策略。我们将在这篇文章中广泛讨论 Apache Spark、Apache Parquet、数据湖、hive 分区和外部表等有趣的内容!


当我们开始将点击流数据从其自有工具传送到我们的 Amazon Redshift 数据仓库时,速度是主要考虑事项。我们的初始用例是将 Salesforce 数据与 Adobe Analytics 数据合并在一起,以便深入了解我们的潜在客户开发流程。Adobe Analytics 可以告诉我们潜在客户来自哪些渠道和宣传活动、访问期间浏览了哪些网页以及是否在我们的网站上提交了潜在客户表单。Salesforce 可以告诉我们潜在客户是否符合条件、是否咨询过顾问以及最终是否注册会员。将这两个数据集合并在一起有助于我们更好地理解和优化我们的营销策略。


首先,我们来了解将 Salesforce 和 Adobe Analytics 数据集中到 Amazon Redshift 所涉及的步骤。但即使在 Redshift 中合并在一起,也需要一个通用标识符才能进行交互。第一步是在我们的网站上提交潜在客户表单时,生成 GUID 并将相同的 GUID 发送到 Salesforce 和 Adobe Analytics。



接下来,我们要将 Salesforce 数据传送到 Redshift。幸运的是,这些馈送已存在,因此我们可以在馈送中添加新的 GUID 属性并在 Redshift 中对其进行描述。


同样,我们必须生成从 Adobe Analytics 到 Amazon Redshift 的数据馈送。Adobe Analytics 提供了 Amazon S3 作为我们数据的目标选项,因此我们将数据传送给 S3,然后创建作业将其发送到 Redshift。此作业涉及到获取每日 Adobe Analytics 馈送,并附有一个含有数百列和数十万行的数据文件、一系列查找文件(如数据标题)和一个描述已发送文件的清单文件 – 然后以其原始状态全部传送至 Amazon S3。接着,我们使用 Amazon EMR 和 Apache Spark 将数据馈送文件处理为单个 CSV 文件,然后将其保存到 S3,以便我们执行 COPY 命令将数据发送到 Amazon Redshift。


此作业持续了几周时间,在我们开始频繁使用数据之前运行良好。虽然作业效率很高,但新列数据发生了日期回溯的情况(模式演变)。因此我们确定需要更大的灵活性,这是由数据的性质决定的。

数据湖补救

当我们决定重构作业时,我们做了两项准备工作。首先,我们逐渐开始采用更多的数据湖策略。其次,近期发布Redshift Spectrum。它使我们能在数据湖中查询点击流数据的平面文件,而无需运行 COPY 命令并将其存储在 Redshift 中。此外,我们可以更有效地将点击流数据合并到存储在 Redshift 中的其他数据源。


我们想利用自描述数据,该数据将数据模式与数据本身相结合。将数据转换为自描述数据有助于我们管理广泛的点击流数据集,并防止模式演变相关的挑战。我们可以将所需的每一列都放入数据湖文件中,然后只使用查询中重要的列以加快处理。为实现这种灵活性,我们使用的是 Apache Parquet 文件格式,因为其列式存储技术,该格式不仅具有自描述性,而且速度很快。我们在 Amazon EMR 上使用 Apache Spark 将 CSV 转换为 Parquet 格式,并对数据分区以获区扫描性能,如以下代码所示。



from datetime import date, timedeltafrom pyspark.sql.types import *from pyspark.sql import SparkSessionimport jsonimport argparse # Usage# spark-submit all_omniture_to_parquet.py 2017-10-31 s3a:// eqxdl-prod-l-omniture eqxios eqxdl-prod eqeqxiosprod omniture_eqxios# python -m tasks.s2w_all_omniture_to_parquet 2017-10-31
parser = argparse.ArgumentParser()parser.add_argument('year_month_day_arg', help='Run date (yyyy-mm-dd)', type=str, default='XXX')parser.add_argument('s3_protocol', help='S3 protocol i.e. s3a://',type=str, default='XXX')parser.add_argument('source_bucket', help='Omniture source data bucket',type=str, default='XXX')parser.add_argument('source_path', help='Omniture source data path',type=str, default='XXX')parser.add_argument('target_bucket', help='Omniture target data bucket',type=str, default='XXX')parser.add_argument('report_suite', help='Omniture report suite ID',type=str, default='XXX')parser.add_argument('application', help='App name for job',type=str, default='XXX')args = parser.parse_args()
spark = SparkSession\ .builder\ .appName(args.application)\ .getOrCreate()
sc = spark.sparkContext
def manifest_toJSON(file, location): text = sc.textFile(file).collect() manifest = {'lookup_files': [], 'data_files': location, 'total_rows': 0} for x in text: if 'Lookup-File:' in x: manifest['lookup_files'].append(location+x.split(': ')[1]) elif 'Data-File: 01' in x: wildcard_path = x.replace('Data-File: 01','*') manifest['data_files'] += wildcard_path elif 'Record-Count:' in x: manifest['total_rows'] += int(x.split(': ')[1]) return manifest
# Create metadata by stitching together the file paths for# the header file and the data file from the manifest file# base_filepath = '/Users/rkelly/projects/sparkyTest/project_remodeling/ios_test_data/'base_filepath = '{}{}/{}/'.format(args.s3_protocol, args.source_bucket, args.source_path)manifest_filepath = base_filepath+'{}_{}.txt'.format(args.report_suite, args.year_month_day_arg)metadata = manifest_toJSON(manifest_filepath, base_filepath)
# Create a list of files and their data# Look specifically for the column_headers.tsv data# Split on \x00 to remove the garbage encoding and return a string of headerslookup_files = sc.textFile(','.join(metadata['lookup_files'])).collect()encoded_header = lookup_files[[idx for idx, s in enumerate(lookup_files) if 'column_headers.tsv' in s][0]].split('\x00')header = encoded_header[[idx for idx, s in enumerate(encoded_header) if '\t' in s][0]]\ .replace('\n', '')\ .replace('(', '')\ .replace(')', '')\ .replace(' ', '-')
# Create a schema for the list from the header file splitting on tabs# Cast everything as a string to avoid data type failuresschema = StructType([ StructField(field, StringType(), True) for field in header.split('\t')])
# Bypass RDD and write data file as a dataframe# then save as parquet to tie headers to their respective valuesdf = spark.read.csv(metadata['data_files'], header=False, schema=schema, sep='\t', nullValue=None)destination_filepath = '{}{}/{}/dt={}/'.format(args.s3_protocol, args.target_bucket, args.application, args.year_month_day_arg)df.write.mode('overwrite').parquet(destination_filepath)
# Gracefully exit out of spark and this filesc.stop()exit()
复制代码


借助 AWS Glue Data Catalog,我们可以在 Amazon Redshift 和其他查询工具(如 Amazon Athena 和 Apache Spark)中查询可用的点击流数据。这是通过将 Parquet 文件映射到关系模式来实现的。AWS Glue 允许在几秒钟内查询其他数据。这是因为模式会实时发生更改。这意味着您可以同时完成列删除/添加、列索引重新排序和列类型更改。然后,就可以在保存模式后立即查询数据。此外,Parquet 格式可防止数据形状发生变化时出现故障,或者放弃和删除数据集中的某些列。


我们使用以下查询为 Adobe Analytics 网站数据创建了第一个 AWS Glue 表。我们在 SQL Workbench 的 Amazon Redshift 中运行了此查询。


--First create your schemacreate external schema omniture_prodfrom data catalog database 'omniture' iam_role 'arn:aws:iam:::role
--Then create your “table” CREATE EXTERNAL TABLE omniture_prod.eqx_web ( date_time VARCHAR, va_closer_id VARCHAR, va_closer_detail VARCHAR, va_finder_detail VARCHAR, va_finder_id VARCHAR, ip VARCHAR, domain VARCHAR, post_evar1 VARCHAR)STORED AS PARQUETLOCATION 's3://eqxdl-prod/omniture/eqx_web/'table properties ('parquet.compress'='SNAPPY');
--Check your databases, schemas, and tablesselect * from pg_catalog.svv_external_databases;select * from pg_catalog.svv_external_schemas;select * from pg_catalog.svv_external_tables;
复制代码


运行此查询后,我们通过 AWS Glue 界面根据请求对 schema 添加了其他列。我们还使用分区来加快查询和降低成本。



此时,我们的数据库中有个新的 schema 文件夹。它包含可以查询的外部表,但我们希望更进一步。我们需要增加一些数据转换,例如:


  • 将 ID 重命名为字符串

  • 连接值

  • 操作字符串,不包括我们从 AWS 发送的用来测试网站的 bot 流量

  • 更改列名称以方便使用。

  • 为此,我们创建了如下所示的外部表视图:


create view edw_t.f_omniture_web as select    REPLACE(dt, '-', '') as hive_date_key,    va_closer_id,    va_closer_detail as last_touch_campaign,    CASE        WHEN (va_closer_id) = '1' THEN 'Paid Search'        WHEN (va_closer_id) = '2' THEN 'Natural Search'        WHEN (va_closer_id) = '3' THEN 'Display'        WHEN (va_closer_id) = '4' THEN 'Email Acq'        WHEN (va_closer_id) = '5' THEN 'Direct'        WHEN (va_closer_id) = '6' THEN 'Session Refresh'        WHEN (va_closer_id) = '7' THEN 'Social Media'        WHEN (va_closer_id) = '8' THEN 'Referring Domains'        WHEN (va_closer_id) = '9' THEN 'Email Memb'        WHEN (va_closer_id) = '10' THEN 'Social Placement'        WHEN (va_closer_id) = '11' THEN 'Other Placement'        WHEN (va_closer_id) = '12' THEN 'Partnership'        WHEN (va_closer_id) = '13' THEN 'Other Eqx Sites'        WHEN (va_closer_id) = '14' THEN 'Influencers'        ELSE NULL    END AS last_touch_channel,    va_finder_detail as first_touch_campaign,    va_finder_id as va_finder_id,    CASE        WHEN (va_finder_id) = '1' THEN 'Paid Search'        WHEN (va_finder_id) = '2' THEN 'Natural Search'        WHEN (va_finder_id) = '3' THEN 'Display'        WHEN (va_finder_id) = '4' THEN 'Email Acq'        WHEN (va_finder_id) = '5' THEN 'Direct'        WHEN (va_finder_id) = '6' THEN 'Session Refresh'        WHEN (va_finder_id) = '7' THEN 'Social Media'        WHEN (va_finder_id) = '8' THEN 'Referring Domains'        WHEN (va_finder_id) = '9' THEN 'Email Memb'        WHEN (va_finder_id) = '10' THEN 'Social Placement'        WHEN (va_finder_id) = '11' THEN 'Other Placement'        WHEN (va_finder_id) = '12' THEN 'Partnership'        WHEN (va_finder_id) = '13' THEN 'Other Eqx Sites'        WHEN (va_closer_id) = '14' THEN 'Influencers'        ELSE NULL    END AS first_touch_channel,    ip as ip_address,    domain as domain,    post_evar1 AS internal_compaign,    post_evar10 as site_subsection_nm,    post_evar11 as IOS_app_view_txt,    post_evar12 AS site_section_nm,    post_evar15 AS transaction_id,    post_evar23 as join_barcode_id,    post_evar3 AS page_nm,    post_evar32 as host_nm,    post_evar41 as class_category_id,    post_evar42 as class_id,    post_evar43 as class_instance_id,    post_evar60 AS referral_source_txt,    post_evar69 as adwords_gclid,    post_evar7 as usersec_tracking_id,    post_evar8 as facility_id,    post_event_list as post_event_list,     post_visid_low||post_visid_high as unique_adobe_id,    post_visid_type as post_visid_type,    post_page_event as hit_type,    visit_num as visit_number,    visit_start_time_gmt,    post_evar25 as login_status,    exclude_hit as exclude_hit,    hit_source as hit_source,    geo_zip,    geo_city,    geo_region,    geo_country,    post_evar64 as api_error_msg,    post_evar70 as page_load_time,    post_evar78 as join_transaction_id,    post_evar9 as page_url,    visit_start_pagename as entry_pg,    post_tnt as abtest_campaign,    post_tnt_action as abtest_experience,    user_agent as user_agent,    mobile_id as mobile_id,    cast(date_time as timestamp) as date_time,    CONVERT_TIMEZONE(        'America/New_York', -- timezone of origin        (cast(            case             when post_t_time_info like '%undefined%' then '0'            when post_t_time_info is null then '0'            when post_t_time_info = '' then '0'            when cast(split_part(post_t_time_info,' ',4) as int) < 0              then left(split_part(post_t_time_info,' ',4),4)            else left(split_part(post_t_time_info,' ',4),3) end as int        )/60),        cast(date_time as timestamp)    ) as date_time_local,    post_t_time_info as local_timezonefrom omniture_prod.eqx_webwhere exclude_hit = '0'and hit_source not in ('5','7','8','9')
and domain <> 'amazonaws.com'and domain <> 'amazon.com'
WITH NO SCHEMA BINDING;

复制代码


现在,我们可以从 Amazon Redshift 执行查询,将我们的结构化 Salesforce 数据与半结构化动态 Adobe Analytics 数据相结合。通过这些更改,我们的数据变得极其灵活、对存储大小非常友好、查询特别高效。从那时起,我们开始将 Redshift Spectrum 用于很多用例,包括数据质量检查、机器数据、历史数据存档,使我们的数据分析师和科学家能够更轻松地混合和加载数据。


with web_leads as (  select transaction_id,last_touch_channel  from edw_t.f_omniture_web  where hive_date_key = '20170301'      and post_event_list like '%201%'      and transaction_id != '807f0cdc-80cf-42d3-8d75-e55e277a8718'),opp_lifecycle as (  SELECT lifecycle,weblead_transactionid  FROM edw_t.f_opportunity  where weblead_transactionid is not null  and created_date_key between '20170220'and '20170310')select  web_leads.transaction_id,  coalesce(opp_lifecycle.lifecycle, 'N/A') as Lifecyclefrom web_leadsleft join opp_lifecycle on web_leads.transaction_id = opp_lifecycle.weblead_transactionid

复制代码

小结

通过将 Amazon S3 数据湖与 Amazon Redshift 合并,我们能为点击流数据构建高效灵活的分析平台。从而无需始终将点击流数据加载到数据仓库中,并且使平台适应传入数据中 schema 更改。请阅读 Redshift Spectrum 入门文档,还可以在下面观看我们在 AWS Chicago Summit 2018 上的演讲。


相关文章:


如果觉得这篇文章有用,请务必阅读从数据湖到数据仓库:利用 Amazon Redshift Spectrum 增强 Customer 360 和 Narrativ 通过 Amazon Redshift 帮助创建者将其数字内容货币化。


作者介绍:


Ryan Kelly 是 Equinox 的数据架构师,帮助草拟和实施数据计划框架。他还负责点击流跟踪,帮助团队深入了解他们的数字计划。Ryan 热衷于让人们更方便地访问和提取他们的数据,以获取商业智能、执行分析和丰富产品/服务。他还喜欢探索和审查新技术,了解是如何改善他们在 Equinox 的工作。


本文转载自 AWS 技术博客。


原文链接:


https://amazonaws-china.com/cn/blogs/china/closing-the-customer-journey-loop-with-amazon-redshift-at-equinox-fitness-clubs/


2019-09-26 10:42704
用户头像

发布了 1913 篇内容, 共 148.4 次阅读, 收获喜欢 81 次。

关注

评论

发布
暂无评论
发现更多内容

苹果mac流行的API开发工具:Postman

Rose

好用的交互式动画界面设计神器 Principle for Mac汉化版

Rose

3D渲染和动画制作软件 KeyShot破解版 附永久许可证及安装教程

Rose

轻松搞定平稳运行,数据库平台 DBStack 帮助 DBA 运维不同基础设施上的各类数据库

Baidu AICLOUD

数据库

告别 Kafka,拥抱 Databend:构建高效低成本的用户行为分析体系

Databend

mac免费的投屏软件duet,帮助用户把mac的屏幕分享到移动设备的应用

Rose

axure rp8中文安装包 附axure rp永久密钥

Rose

One Switch for Mac(菜单栏一键开关控制神器)v1.33.1中文版

Rose

harmony_flutter 自定义toast

flfljh

HarmonyOS harmony

Eudic 欧路词典:多语种翻译神器,精准释义与例句一应俱全

Rose

2024-11-27:字符串的分数。用go语言,给定一个字符串 s,我们可以定义其“分数”为相邻字符的 ASCII 码差值绝对值的总和。 请计算并返回字符串 s 的分数。 输入:s = “hello“

福大大架构师每日一题

福大大架构师每日一题

中昊芯英基于国产TPU算力服务器在高校中的应用,助力太极股份成功入选特色案例

科技热闻

区块链游戏的新观察:自治世界能否成为未来链游的突破口?

区块链软件开发推广运营

dapp开发 区块链开发 链游开发 NFT开发 公链开发

字节跳动开源云原生数据仓库 ByConity 有奖众测,邀你体验完整的数仓能力

InfoQ写作社区官方

有奖活动 开发 热门活动

Pixelmator Pro for Mac 非常强大、美观且易于使用的图像编辑器

Rose

GreptimeDB Edge 2.0 车端多模态数据库线上发布会

Greptime 格睿科技

数据库 车联网 汽车

需求管理的主要内容包括哪些

易成研发中心

需求管理 需求管理工具

disk drill mac 破解版 附disk drill 激活码 好用的苹果数据恢复软件

Rose

TinyEngine低代码引擎2.0新特性介绍

OpenTiny社区

低代码 OpenTiny TinyEngine 前端开源

金狗出没时间揭秘:为什么凌晨成了 Meme 币的神盘时段?

区块链软件开发推广运营

交易所开发 dapp开发 链游开发 公链开发 代币开发

GoodNotes 5 - 笔记、绘图、文档管理一站式搞定

Rose

鸿蒙Flutter 常见问题总结

flfljh

鸿蒙 Next 中 Text 组件用法总结

flfljh

揭秘淘宝天猫API接口:轻松获取商品详情与优惠券券后价

代码忍者

API 接口 pinduoduo API

安能物流 All in TiDB 背后的故事与成果

PingCAP

数据库 TiDB

2024华为云开源开发者论坛项目抢鲜看|Kmesh: 监控指标和访问日志功能详解

华为云原生团队

云计算 开源 容器 云原生

印象笔记发布全新知识管理平台 AI让你的第二个大脑持续升级

E科讯

鸿蒙 next 封装日志工具类 LogUtil

flfljh

解锁淘宝天猫数据宝藏:深度探索商品描述、详情图与评论的API调用新视角

代码忍者

API 接口 pinduoduo API

Amazon Redshift 助力 Equinox Fitness Clubs 完成客户旅程_语言 & 开发_亚马逊云科技 (Amazon Web Services)_InfoQ精选文章