手把手教你使用 Amazon EMR 进行交互式数据查询

阅读数:13 2019 年 11 月 20 日 08:00

手把手教你使用Amazon EMR进行交互式数据查询
本文将带您一步步完成一个利用 Amazon EMR 进行交互式数据查询的实例,过程包括数据的注入、数据的分析、结果的转存、以及将 ** 整个过程自动化 ** 的方法。其中涉及的 EMR 组件主要包括: Hive, Hadoop, presto, sqoop。除 EMR 外,涉及到的其他服务包括:S3, RDS. 本文所使用的数据源是 cloudfront 产生的日志。
复制代码
在按照本文档进行操作之前,读者需了解 S3,RDS 并能够进行基本的 S3,RDS 的操作,读者需了解 EMR 的基本概念。以下是参考资料:
什么是 EMR:
Amazon Elastic MapReduce (Amazon EMR) 是一种托管数据分析服务的框架,提升企业、研究人员、数据分析师和开发人员轻松、经济高效掌控海量数据的能力。其当前版本中托管的服务包括:Hadoop, Zeppelin, Tez, Ganglia, HBase, Pig, Hive, Presto, ZooKeeper, Sqoop, Mahout, Hue, Phoenix, Oozie, Spark, Hcatalog. EMR 让您专注于数据分析,无需担心费时的集群设置、管理或调整,也无需担心所需要的计算能力。
什么是 S3:
Amazon Simple Storage Service (Amazon S3) 为开发人员和 IT 团队提供安全、耐用且高度可扩展的对象存储。S3 可为 EMR 提供文件存储服务。
什么是 RDS:
Amazon Relational Database Service (Amazon RDS) 是一种可让用户在云中轻松设置、操作和扩展关系数据库的 Web 服务。 它在承担耗时的数据库管理任务的同时,又可提供经济高效的可调容量,使您能够腾出时间专注于应用程序开发。Amazon RDS 让您能够访问非常熟悉的 MySQL、PostgreSQL、Oracle 或 Microsoft SQL Server 等数据库引擎的功能。
** 准备工作 **
1. Cloudfront 生成的日志已经存储在 s3 桶中,并在不断更新, 存储目录是:s3://testcloudfrontlog/log
其中 testcloudfrontlog 是 s3 存储桶的名字,在实际操作的时候,需要换一个名字,因为 s3 存储桶的名字是全局唯一的, 而其他人也有可能使用了这个名字。
可以从以下链接下载本例中使用的示例文件,并上传到 s3://testcloudfrontlog/log 目录下。
https://s3-us-west-2.amazonaws.com/hxyshare/cloudfrontlog/E36NLFLFEN3X0H.2016-07-11-02+.36b1a433.gz
https://s3-us-west-2.amazonaws.com/hxyshare/cloudfrontlog/E36NLFLFEN3X0H.2016-07-12-21.a92ddc55.gz
2. 创建一个目录来存储按照日期划分的日志数据。 目录是 s3://testcloudfrontlog/logbydate
3. 创建一个目录用来做 hive 表的数据存储,目录是 s3://testcloudfrontlog/logpart
4. 注意:直接 copy 本文的代码有可能会由于字符原因出现错误,建议先 copy 到纯文本编辑器中再执行。
** 手动方式完成交互式数据查询 **
第一步,创建一个 EMR 集群,方法如下:
1. 进入到 AWS 的控制台,选择 EMR 服务,点击创建集群。
!
2. 点击转到高级选项
!
3. 软件配置
选择 EMR 发行版本以及所需要的软件, 在本例中,我们选择 emr-5.0.0 版本,所需要的工具选择 hadoop, hive, presto, sqoop。 本步骤中的其余选项使用默认值,然后点击下一步。如果是使用北京 Region,在输入配置中输入以下内容,使得 presto 可访问 s3, 如果使用其他 Region,不必输入。
`[{"classification":"presto-connector-hive", "properties":{"hive.s3.pin-client-to-current-region":"true"}, "configurations":[]}]`
!
4. 硬件配置
进入到硬件配置界面,默认配置如下,直接使用默认配置。然后点击下一步。
!
5. 一般选项
在一般选项的集群名称后面输入一个名字,作为集群的名字。其余的可按照默认配置。然后点击下一步。
!
6. 安全选项
在 EC2 键对后面的框中选择一个已有的键对,该键对用来在集群创建成功后,从 SSH 客户端登录到集群中的任意一台服务器。如果选择“在没有 EC2 键对的情况下继续”,则后续不能登录到集群中的机器。其余选项均可默认。然后点击创建集群。! 7. 修改安全组规则,并登录 EMR 的主节点
进入到刚刚创建的集群的信息界面,点击主节点安全组,进入到该安全组的配置界面,在入规则中增加 SSH 的访问规则,这样才可以通过 SSH 的方式从外部机器登录到主节点。然后通过任意一个 SSH 客户端登录到主节点,目标地址是图中所示的主节点共有 DNS, 用户名是 hadoop, 通过私钥登录,私钥与前面所提到的键对对应。
!
第二步,创建数据表并进行查询
1. SSH 到主节点后,执行 hive 命令,进入到 hive 命令行界面
!
2. 创建一个用日期作为分区的 hive 表,用来作为最终被查询的表
将以下脚本 copy 到 hive> 提示符下执行,注意 LOCATION 的参数需要改成你自己的目录。
`CREATE TABLE IF NOT EXISTS cloudfrontlogpart (`
`time STRING, xedgelocation STRING, scbytes INT, cip STRING, csmethod STRING, csHost STRING, csuristem STRING, scstatus INT, csReferer STRING, csUserAgent STRING, csuriquery STRING, csCookie STRING, xedgeresulttype STRING, xedgerequestid STRING, xhostheader STRING, csprotocol STRING, csbytes STRING, timetaken STRING, xforwardedfor STRING, sslprotocol STRING, sslcipher STRING, xedgeresponseresulttype STRING`
`)`
`PARTITIONED BY (datee Date)`
`STORED AS PARQUET`
`LOCATION 's3://testcloudfrontlog/logpart';`
3. 输入 quit 命令,退出 hive。用 aws s3 命令将 s3://testcloudfrontlog/log 中日志 copy 到 s3://testcloudfrontlog/logbydate 中按照时间划分的目录下
2016-07-11 这一天的文件为例,命令如下,注意,s3 目录需要改成你自己的。
`aws s3 cp s3://testcloudfrontlog/log/ s3://testcloudfrontlog/logbydate/2016-07-11/ --exclude "*" --include "*.2016-07-11*" --recursive`
这里用到了 aws s3 命令行工具。你可以在 EMR 主节点中退出 hive 命令行程序,然后执行以上命令。或者在任意一个安装了 aws cli 工具并配置了 s3 访问权限的机器中执行。本例中直接在 EMR 的主节点中执行。aws s3 cp 不支持通配符,所以用–exclude 和 –include 参数来代替。
4. 针对 s3://testcloudfrontlog/logbydate/2016-07-11/ 中的数据,创建一个 HIVE 表。
假设表的名字是 cloudfrontlog20160711, 输入 hive, 重新进入到 hive 命令行工具,并输入以下语句,注意 LOCATION 的参数需要改成你自己的目录。
`CREATE EXTERNAL TABLE IF NOT EXISTS cloudfrontlog20160711 (`
`date1 Date, time STRING, xedgelocation STRING, scbytes INT, cip STRING, csmethod STRING, csHost STRING, csuristem STRING, scstatus INT, csReferer STRING, csUserAgent STRING, csuriquery STRING, csCookie STRING, xedgeresulttype STRING, xedgerequestid STRING, xhostheader STRING, csprotocol STRING, csbytes STRING, timetaken STRING, xforwardedfor STRING, sslprotocol STRING, sslcipher STRING, xedgeresponseresulttype STRING`
`)`
`ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'`
`LOCATION 's3://testcloudfrontlog/logbydate/2016-07-11/'`
`tblproperties("skip.header.line.count"="2");`
5. 数据注入
至此,已经创建了两个 hive 表,通过在 hive 命令行工具中执行 show tables 命令,可以查看到两个 hive 表。
!
向带分区的 hive 表中注入 711 日的数据,在 hive 命令行界面中分别执行以下命令:
`set hive.exec.dynamic.partition.mode=nonstrict;`
`INSERT INTO TABLE cloudfrontlogpart PARTITION (datee)`
`SELECT time, xedgelocation, scbytes, cip, csmethod, csHost, csuristem, scstatus, csReferer, csUserAgent, csuriquery, csCookie, xedgeresulttype, xedgerequestid, xhostheader, csprotocol, csbytes, timetaken, xforwardedfor, sslprotocol, sslcipher, xedgeresponseresulttype, date1`
`FROM cloudfrontlog20160711;`
以下是 INSERT 语句执行过程的显示信息。
!
完成后,如果进入到 s3://testcloudfrontlog/logpart 目录下,可以查看到已经生成了按日期划分的目录,目录下存储了文件。
第三步,数据查询
可以继续使用 hive 做查询,也可以进入 presto 做查询。这里,我们使用 presto, 由于 presto 完全使用内存进行计算, 速度更快。进入 presto 的方式如下:
执行 quit, 退出 hive 命令行程序。
然后执行以下语句进入 presto 命令行界面:
`presto-cli --catalog hive --schema default`
该语句表示 presto 使用 hive 数据源,并且使用 hive 数据源中的 default 数据库。细节请参考 presto 的社区文档。
!
执行一个简单的查询:
`SELECT time, scbytes, cip FROM cloudfrontlogpart WHERE CAST(datee AS varchar) = CAST('2016-07-11' AS varchar) LIMIT 5;`
!
然后退出 presto 命令行工具, 输入 quit;
第四步,删除 EMR 集群
在集群列表中,选中刚刚创建的集群,点击终止,以终止该集群。如果开启的终止保护,需要变更一下终止保护的状态,然后再终止。
!
至此, 我们通过手动的方式完成了一个简单的数据查询。但在实际生产环境中,使用手动的方式会耗费很长时间做重复性的工作,并难免出错。EMR 更大优势是能够通过程序的方式去控制集群的创建以及任务的执行,这使得 EMR 的使用者能够将集群创建以及数据分析的过程自动化。
接下来的部分将以相同的示例指导读者一步步的实现自动化的创建集群、分析数据、转存结果、关闭集群。
** 自动方式完成交互式数据查询 **
首先概括几点自动化执行数据分析的需求:
1. 集群在每天的固定时间被创建,然后对数据进行分析,然后集群被自动删除。这显然不能通过图形界面进行一步步的操作了。
2. 在手动操作中执行的每个步骤,需要按顺序自动化执行。这些步骤包括:
– 从 /log 目录向 /logbydate 目录中 copy 特定日期的文件。
– 创建对应特定日期的 hive 表, 例如表名为 cloudfrontlog20160711。
– 从 cloudfrontlog20160711 向 cloudfrontlogpart 表中注入数据。
– 使用 presto 从 cloudfrontlogpart 表中查询出需要的数据。
– 此外,在手动执行的最后一步,我们可以直接看到查询结果,但在自动化执行的过程中,我们需要将查询结果存储到一个长期运行的数据库中,供随时查询。
3. 将 hive 元数据放在集群的外部。
在手动执行的流程中,创建 hive 表后,hive 的元数据存储在了主节点。而在自动化执行的过程中,当所有任务执行完毕后,集群被删除,存储在主节点的元数据也会被删除,因此要在外部数据库中存储 hive 的元数据。
针对以上的几个需求,在 EMR 中对应的解决方法如下.
1. 使用 EMR 的命令行进行各种集群的操作,例如集群的创建,参数的设置等。
2. 使用 EMR 的“step”来组织各个任务的执行。
3. 创建一个外部的数据库用来存储 hive 元数据,并在集群创建的时候指定元数据的存储位置。
接下来详细描述操作步骤和脚本
第一步,准备工作
1. 准备两个 mysql 数据库分别用来存储 hive 元数据和查询结果,可以使用 AWS 的 RDS 服务来创建。这两个数据库都需要能被 EMR 访问到, 这两个数据库也可以使用同一个物理服务器或虚拟机。
2. 假设存储查询结果的数据库名字是 loganalydb,我们在该数据库中创建一个表,用来存储结果数据,表的名字是 loganalytb。根据后面所进行的查询, 使用如下语句创建数据库以及与查询结果匹配的表:
`CREATE DATABASE loganalydb;`
`USE loganalydb;`
``CREATE TABLE `loganalytb` ( `id` int(11) NOT NULL AUTO_INCREMENT, `filepath` varchar(300) DEFAULT NULL, `totalbyte` bigint(20) DEFAULT NULL, `tdate` varchar(50) DEFAULT NULL, PRIMARY KEY (`id`)) ENGINE=InnoDB AUTO_INCREMENT=58147 DEFAULT CHARSET=utf8;``
对于存储 hive 元数据的数据库,暂时不必创建,从后面提到的 emrconf.json 文件可以看到,只需要给出数据库服务器的地址,以及用户名和密码即可,数据库不存在的话,会自动创建。
3. 由于自动化的流程与前面提到的手动流程使用相同的示例,因此,请事先删除手动流程示例中产生的数据。包括:
1) s3://testcloudfrontlog/logbydate 目录下的数据
2) s3://testcloudfrontlog/logpart 目录下的数据
第二步,编写脚本
下面给出各个脚本,并配以注释解释
1. 主程序脚本 loganaly.sh
`#!/bin/bash`
`# 变量 KEYNAMES 是你的 SSH key 的名字, 用来通过 SSH 方式访问 EC2。`
`KEYNAME=yourkeyname`
`# 变量 CONFIGFILE 是 emrconf.json 的 url 地址,emrconf.json 这个文件中包含了存储 hive 元数据的外部数据库的信息。将该文件存在 s3 中并让这个文件能够从外部访问到。emrconf.json 中的内容在下文中会给出。`
`CONFIGFILE=https://s3-us-west-2.amazonaws.com/testcloudfrontlog/conf/emrconf.json`
`# EMR 集群产生的日志所存放的 s3 目录。`
`LOGURI=s3://testcloudfrontlog/emrlog/`
`# 脚本、配置文件、jar 包所在的目录`
`CODEDIR=s3://testcloudfrontlog/conf`
`# cloudfront 日志的存放位置, 结尾别加 /`
`LOGSOURCEDIR=s3://testcloudfrontlog/log`
`# 当天日志的中转目录, 结尾别加 /`
`STAGINGDIR=s3://testcloudfrontlog/logbydate`
`# 昨天的日期, 形如:20160711`
`DATE=$(date -d "yesterday" +%Y%m%d)`
`# 昨天的日期,另外一种格式, 形如:2016-07-11`
`DATEE=$(date -d "yesterday" +%Y-%m-%d)`
`# 被查询的 hive 表的文件存储位置, 结尾别加 /`
`PARTDIR=s3://testcloudfrontlog/logpart`
`# 用来存储结果文件的目录, 结果文件将被 sqoop 使用,数据会被传到 mysql.`
`SQOOPFILE=s3://testcloudfrontlog/sqoopfile`
`#用来存储结果数据的 mysql 的信息。`
`DBHOST=rdsinstance.xxxxxxx.us-west-2.rds.amazonaws.com`
`JDBCURL=jdbc:mysql://rdsinstance.xxxxxxx.us-west-2.rds.amazonaws.com/loganalydb`
`DBUSER=username`
`DBPASS=password`
`# EMR 集群的配置信息, 这里是以 Oregan region 为例。如果使用的是北京 region,AWSREGION 用 cn-north-1。`
`AWSREGION=us-west-2`
`MASTERTYPE=m3.xlarge`
`CORETYPE=r3.xlarge`
`TASKTYPE=r3.xlarge`
`MASTERNUM=1`
`CORENUM=1`
`TASKNUM=1`
`# 删除当天的数据,目的是防止脚本在同一天被多次执行而造成数据冗余,`
`aws s3 rm $PARTDIR/datee=$DATEE --recursive`
`aws s3 rm $SQOOPFILE/$DATE --recursive`
`mysql -h$DBHOST -u$DBUSER -p$DBPASS --execute "DELETE FROM loganalydb.loganalytb WHERE tdate='$DATEE'"`
`# 创建 emr 集群, 名字是 loganaly, 其中:`
`# --auto-terminate 参数表示该集群在执行完所有的任务后自动删除。`
`# --configurations 参数的文件中的参数配置覆盖了该集群运行起来后的默认参数配置。在本例中用来修改 Hive 元数据的存储位置。`
`# --step 参数规定了该集群在创建后要执行的几个任务,其中 Type=Hive 的 step, 需要给出包含 hive 语句的文件作为参数。而 Type=CUSTOM_JAR 的 step, 需要给出一个 JAR 包,这里我们使用 EMR 提供 scrip-runner.jar, 它的作用是执行其第一个参数中指定的脚本文件,并将其余的参数作为脚本文件的输入参数。`
`# --instance groups 参数规定了集群的中各节点的机型和数量`
`aws --region $AWSREGION emr create-cluster --name "loganaly" --release-label emr-5.0.0 \`
{1}
`--applications Name=Hadoop Name=Hive Name=Presto Name=Sqoop \`
{1}
`--use-default-roles \`
{1}
`--ec2-attributes KeyName=$KEYNAME \`
{1}
`--termination-protected \`
{1}
`--auto-terminate \`
{1}
`--configurations $CONFIGFILE \`
{1}
`--enable-debugging \`
{1}
`--log-uri $LOGURI \`
{1}
`--steps \`
{1}
`Type=CUSTOM_JAR,Name="cpjar",Jar=$CODEDIR/script-runner.jar,Args=["$CODEDIR/cpjar.sh"," $CODEDIR"] \`
{1}
`Type=CUSTOM_JAR,Name="log2staging",Jar=$CODEDIR/script-runner.jar,Args=["$CODEDIR/log2logbydate.sh","$LOGSOURCEDIR","$STAGINGDIR","$DATE","$DATEE"] \`
{1}
`Type=Hive,Name="HiveStep",Args=[-f,$CODEDIR/hivetables.q,-d,PARTDIRh=$PARTDIR,-d,STAGINGDIRh=$STAGINGDIR,-d,DATEh=$DATE] \`
{1}
`Type=CUSTOM_JAR,Name="Presto2s3",Jar=$CODEDIR/script-runner.jar,Args=["$CODEDIR/presto2s3.sh","$SQOOPFILE","$DATE","$DATEE"] \`
{1}
`Type=CUSTOM_JAR,Name="s3tomysql",Jar=$CODEDIR/script-runner.jar,Args=["$CODEDIR/s3tomysql.sh","$JDBCURL","$DBUSER","$DBPASS","$SQOOPFILE","$DATE"] \`
{1}
`--instance-groups \`
{1}
`Name=Master,InstanceGroupType=MASTER,InstanceType=$MASTERTYPE,InstanceCount=$MASTERNUM \`
{1}
`Name=Core,InstanceGroupType=CORE,InstanceType=$CORETYPE,InstanceCount=$CORENUM \`
{1}
`Name=Task,InstanceGroupType=TASK,InstanceType=$TASKTYPE,InstanceCount=$TASKNUM`
{1}
2. 准备 cpjar.sh 脚本
{1}
作用是将 mysql 的 JDBC 驱动包下载到本地的 Sqoop 目录下,sqoop 在将文件转存到数据库的时候会用到 JDBC 驱动。
{1}
在国外 region, 使用以下脚本:
{1}
`#!/bin/bash`
{1}
`CODEDIR=$1`
{1}
`sudo aws s3 cp $CODEDIR/mysql-connector-java-5.1.38-bin.jar /usr/lib/sqoop/lib/`
{1}
在北京 Region, 使用以下脚本:
{1}
`#!/bin/bash`
{1}
`CODEDIR=$1`
{1}
`sudo aws s3 cp $CODEDIR/mysql-connector-java-5.1.38-bin.jar /usr/lib/sqoop/lib/ --region cn-north-1`
{1}
3. 准备 log2logbydate.sh 脚本
{1}
做用是将原始日志文件 copy 到按天划分的目录下。
{1}
`#!/bin/bash`
{1}
`LOGSOURCEDIR=$1`
{1}
`STAGINGDIR=$2`
{1}
`DATE=$3`
{1}
`DATEE=$4`
{1}
`aws s3 cp $LOGSOURCEDIR/ $STAGINGDIR/$DATE/ --exclude "*" --include "*.$DATEE*" --recursive`
{1}
4. 准备 hivetables.q 脚本
{1}
作用包括:创建待查询的表和按天命名的临时表,将临时表中数据注入到待查询的表中,并删除临时表。
{1}
`CREATE TABLE IF NOT EXISTS cloudfrontlogpart (`
{1}
`time STRING, xedgelocation STRING, scbytes INT, cip STRING, csmethod STRING, csHost STRING, csuristem STRING, scstatus INT, csReferer STRING, csUserAgent STRING, csuriquery STRING, csCookie STRING, xedgeresulttype STRING, xedgerequestid STRING, xhostheader STRING, csprotocol STRING, csbytes STRING, timetaken STRING, xforwardedfor STRING, sslprotocol STRING, sslcipher STRING, xedgeresponseresulttype STRING`
{1}
`)`
{1}
`PARTITIONED BY (datee Date)`
{1}
`LOCATION '${PARTDIRh}';`
{1}
{1}
{1}
`CREATE EXTERNAL TABLE IF NOT EXISTS cloudfrontlog${DATEh} (`
{1}
`date1 Date, time STRING, xedgelocation STRING, scbytes INT, cip STRING, csmethod STRING, csHost STRING, csuristem STRING, scstatus INT, csReferer STRING, csUserAgent STRING, csuriquery STRING, csCookie STRING, xedgeresulttype STRING, xedgerequestid STRING, xhostheader STRING, csprotocol STRING, csbytes STRING, timetaken STRING, xforwardedfor STRING, sslprotocol STRING, sslcipher STRING, xedgeresponseresulttype STRING`
{1}
`)`
{1}
`ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'`
{1}
`LOCATION '${STAGINGDIRh}/${DATEh}/'`
{1}
`tblproperties("skip.header.line.count"="2");`
{1}
{1}
{1}
`--make dynamic insert available`
{1}
`set hive.exec.dynamic.partition.mode=nonstrict;`
{1}
{1}
{1}
`--insert data.`
{1}
`INSERT INTO TABLE cloudfrontlogpart PARTITION (datee)`
{1}
`SELECT time, xedgelocation, scbytes, cip, csmethod, csHost, csuristem, scstatus, csReferer, csUserAgent, csuriquery, csCookie, xedgeresulttype, xedgerequestid, xhostheader, csprotocol, csbytes, timetaken, xforwardedfor, sslprotocol, sslcipher, xedgeresponseresulttype, date1`
{1}
`FROM cloudfrontlog${DATEh};`
{1}
{1}
{1}
`--delete the staging table`
{1}
`DROP TABLE cloudfrontlogstaging${DATEh};`
{1}
5. 准备 presto2s3.sh 脚本
{1}
作用包括: 查询 hive 表,并将结果存成.csv 格式的文件,将该文件上传到 S3 中相应的目录下。
{1}
`#!/bin/bash`
{1}
`TIME=$(date +%H%M%S)`
{1}
`SQOOPFILE=$1`
{1}
`DATE=$2`
{1}
`DATEE=$3`
{1}
`sudo presto-cli --catalog hive --schema default --execute "SELECT NULL as id, csuristem, SUM(scbytes) as totalbyte, CAST('$DATEE' AS varchar) as date FROM cloudfrontlogpart WHERE CAST(datee AS varchar) = CAST('$DATEE' AS varchar) GROUP BY csuristem order by totalbyte desc" --output-format CSV > ~/cdnfilestat.csv`
{1}
`aws s3 cp ~/cdnfilestat.csv $SQOOPFILE/$DATE/cdnfilestat$TIME.csv`
{1}
6. 准备 s3tomysql.sh 脚本
{1}
作用是利用 sqoop 将 s3 中的.csv 文件中的内容转存到数据库中。注意,如果单次转存的数据量大,你可能需要调大数据库的 max_allowed_packet 参数。
{1}
`#!/bin/bash`
{1}
`JDBCURL=$1`
{1}
`DBUSER=$2`
{1}
`DBPASS=$3`
{1}
`SQOOPFILE=$4`
{1}
`DATE=$5`
{1}
`sqoop export --connect $JDBCURL --username $DBUSER --password $DBPASS --table loganalytb --fields-terminated-by ',' --enclosed-by '\"' --export-dir $SQOOPFILE/$DATE/`
{1}
7. 准备 emrconf.json 脚本
{1}
作用是使得创建起来的 EMR 集群的 hive 元数据存储在外部数据库中。注意: 根据准备工作中创建的数据库来修改文件中的 ConnectionUserName、ConnectionPassword、ConnectionURL 三个参数。
{1}
`[`
{1}
`{"Classification":"hive-site",`
{1}
`"Properties":{`
{1}
`"javax.jdo.option.ConnectionUserName":"username",`
{1}
`"javax.jdo.option.ConnectionDriverName":"org.mariadb.jdbc.Driver",`
{1}
`"javax.jdo.option.ConnectionPassword":"password",`
{1}
`"javax.jdo.option.ConnectionURL":"jdbc:mysql://rdsinstance.xxxxxxxxxx.us-west-2.rds.amazonaws.com:3306/hive?createDatabaseIfNotExist=true"`
{1}
`},`
{1}
`"Configurations":[]`
{1}
`}`
{1}
`]`
{1}
注意:如果是在北京 Region, emrconf.json 使用以下脚本
{1}
`[`
{1}
`{"Classification":"hive-site",`
{1}
`"Properties":{`
{1}
`"javax.jdo.option.ConnectionUserName":"username",`
{1}
`"javax.jdo.option.ConnectionDriverName":"org.mariadb.jdbc.Driver",`
{1}
`"javax.jdo.option.ConnectionPassword":"password",`
{1}
`"javax.jdo.option.ConnectionURL":"jdbc:mysql://rdsinstance.xxxxxxxx.us-west-2.rds.amazonaws.com:3306/hive?createDatabaseIfNotExist=true"`
{1}
`},`
{1}
`"Configurations":[]`
{1}
`},`
{1}
`{"Classification":"presto-connector-hive",`
{1}
`"Properties":{`
{1}
`"hive.s3.pin-client-to-current-region":"true"`
{1}
`},`
{1}
`"Configurations":[]`
{1}
`}`
{1}
`]`
{1}
8. 准备 jar 包
{1}
需要两个 jar,一个是 script-runner.jar,下载地址: http://s3.amazonaws.com/elasticmapreduce/libs/script-runner/script-runner.jar
{1}
另一个是 mysql 的 JDBC 驱动,下载地址: https://s3-us-west-2.amazonaws.com/hxyshare/mysql-connector-java-5.1.38-bin.jar
{1}
9. 上传文件
{1}
在 s3 中创建 s3://testcloudfrontlog/conf/ 目录,并将第 8 步中的两个 jar 包,以及 cpjar.sh,log2staging.sh,hivetables.q,presto2s3.sh,s3tomysql.sh,emrconf.json, 上传到该目录。
{1}
由于 emrconf.json 需要通过 http 的方式访问到, 在 s3 中将 emrconf.json 的访问权限增加“所有人可下载”。
{1}
cloudfront 日志文件 copy 到 s3://testcloudfrontlog/log 目录下, 两个示例文件下载地址:
{1}
https://s3-us-west-2.amazonaws.com/hxyshare/cloudfrontlog/E36NLFLFEN3X0H.2016-07-11-02+.36b1a433.gz
{1}
https://s3-us-west-2.amazonaws.com/hxyshare/cloudfrontlog/E36NLFLFEN3X0H.2016-07-12-21.a92ddc55.gz
{1}
如果在手动流程中已经上传了日志文件,则不必再上传。
{1}
第三步,执行程序
{1}
暂时将主程序 loganaly.sh 中的 DATE 和 DATEE 参数修改为示例数据的时间,例如, 分别写成 20160711 和 2016-07-11,在任意一个 Linux 系统中运行主程序脚本 loganaly.sh(例如,可以使用 EC2 实例)。但需注意:
{1}
1) 该机器需要安装了 AWS 命令行工具,并具有 s3 和 EMR 的操作权限。
{1}
2) 该机器能访问到存储数据结果的数据库, 因为 loganaly.sh 中有对该数据库的操作。
{1}
集群创建成功后,自动执行每个 step 中的任务,所有任务执行完成后自动关闭,从下图中可以看到每个 Step 的执行:
{1}
!{1}
{1}
所有任务执行完成后,进入到存储查询结果的数据库,查看输入的结果:
{1}
!{1}
{1}
如果要想每天执行 loganaly.sh 脚本并对前一天的数据进行处理和分析,将 loganaly.sh 中的 DATE 和 DATEE 分别赋值为 $(date -d “yesterday” +%Y%m%d) 和 $(date -d “yesterday” +%Y-%m-%d),然后创建一个 crontab 定时任务,每天定时执行 loganaly.sh.
{1}
如果是在 AWS 中国以外的 region 执行,还可以利用竞价实例来大幅的降低成本,使用竞价实例的方法也非常简单,只需要在将 loganaly.sh 中对创建 EMR 集群的脚本稍做修改,增加 BidPrice 参数:
{1}
`--instance-groups \`
{1}
`Name=Master,InstanceGroupType=MASTER,InstanceType=$MASTERTYPE,BidPrice=0.2,InstanceCount=$MASTERNUM \`
{1}
`Name=Core,InstanceGroupType=CORE,InstanceType=$CORETYPE,BidPrice=0.2,InstanceCount=$CORENUM \`
{1}
`Name=Task,InstanceGroupType=TASK,InstanceType=$TASKTYPE,BidPrice=0.2,InstanceCount=$TASKNUM`
{1}
第四步,删除资源
{1}
为避免额外花费,删除本实验过程中 (包括手动过程以及自动过程中) 创建的资源,S3, EC2 等资源。
{1}
#### 总结
{1}
本文中使用 Cloudfront 日志进行分析,但本文中使用的方法稍作修改便适用于其他类型的日志类文件的分析。本文中主要使用了 EMR 中的 Hive,Presto,Sqoop 工具,但 EMR 还有更多的工具 (例如 Spark) 可供用户使用,用户在创建集群的时候增加相应的服务即可实现丰富的功能。
{1}
** 作者介绍:**
{1}
!{1}
{1}
韩小勇
{1}
AWS 解决方案架构师,负责基于 AWS 的云计算方案架构咨询和设计,实施和推广,在加入 AWS 之前,从事电信核心网系统上云的方案设计及标准化推广 。
{1}
本文将带您一步步完成一个利用 Amazon EMR 进行交互式数据查询的实例,过程包括数据的注入、数据的分析、结果的转存、以及将 ** 整个过程自动化 ** 的方法。其中涉及的 EMR 组件主要包括: Hive, Hadoop, presto, sqoop。除 EMR 外,涉及到的其他服务包括:S3, RDS. 本文所使用的数据源是 cloudfront 产生的日志。
复制代码
在按照本文档进行操作之前,读者需了解 S3,RDS 并能够进行基本的 S3,RDS 的操作,读者需了解 EMR 的基本概念。以下是参考资料:
什么是 EMR:
Amazon Elastic MapReduce (Amazon EMR) 是一种托管数据分析服务的框架,提升企业、研究人员、数据分析师和开发人员轻松、经济高效掌控海量数据的能力。其当前版本中托管的服务包括:Hadoop, Zeppelin, Tez, Ganglia, HBase, Pig, Hive, Presto, ZooKeeper, Sqoop, Mahout, Hue, Phoenix, Oozie, Spark, Hcatalog. EMR 让您专注于数据分析,无需担心费时的集群设置、管理或调整,也无需担心所需要的计算能力。
什么是 S3:
Amazon Simple Storage Service (Amazon S3) 为开发人员和 IT 团队提供安全、耐用且高度可扩展的对象存储。S3 可为 EMR 提供文件存储服务。
什么是 RDS:
Amazon Relational Database Service (Amazon RDS) 是一种可让用户在云中轻松设置、操作和扩展关系数据库的 Web 服务。 它在承担耗时的数据库管理任务的同时,又可提供经济高效的可调容量,使您能够腾出时间专注于应用程序开发。Amazon RDS 让您能够访问非常熟悉的 MySQL、PostgreSQL、Oracle 或 Microsoft SQL Server 等数据库引擎的功能。
** 准备工作 **
1. Cloudfront 生成的日志已经存储在 s3 桶中,并在不断更新, 存储目录是:s3://testcloudfrontlog/log
其中 testcloudfrontlog 是 s3 存储桶的名字,在实际操作的时候,需要换一个名字,因为 s3 存储桶的名字是全局唯一的, 而其他人也有可能使用了这个名字。
可以从以下链接下载本例中使用的示例文件,并上传到 s3://testcloudfrontlog/log 目录下。
https://s3-us-west-2.amazonaws.com/hxyshare/cloudfrontlog/E36NLFLFEN3X0H.2016-07-11-02+.36b1a433.gz
https://s3-us-west-2.amazonaws.com/hxyshare/cloudfrontlog/E36NLFLFEN3X0H.2016-07-12-21.a92ddc55.gz
2. 创建一个目录来存储按照日期划分的日志数据。 目录是 s3://testcloudfrontlog/logbydate
3. 创建一个目录用来做 hive 表的数据存储,目录是 s3://testcloudfrontlog/logpart
4. 注意:直接 copy 本文的代码有可能会由于字符原因出现错误,建议先 copy 到纯文本编辑器中再执行。
** 手动方式完成交互式数据查询 **
第一步,创建一个 EMR 集群,方法如下:
1. 进入到 AWS 的控制台,选择 EMR 服务,点击创建集群。
!
2. 点击转到高级选项
!
3. 软件配置
选择 EMR 发行版本以及所需要的软件, 在本例中,我们选择 emr-5.0.0 版本,所需要的工具选择 hadoop, hive, presto, sqoop。 本步骤中的其余选项使用默认值,然后点击下一步。如果是使用北京 Region,在输入配置中输入以下内容,使得 presto 可访问 s3, 如果使用其他 Region,不必输入。
`[{"classification":"presto-connector-hive", "properties":{"hive.s3.pin-client-to-current-region":"true"}, "configurations":[]}]`
!
4. 硬件配置
进入到硬件配置界面,默认配置如下,直接使用默认配置。然后点击下一步。
!
5. 一般选项
在一般选项的集群名称后面输入一个名字,作为集群的名字。其余的可按照默认配置。然后点击下一步。
!
6. 安全选项
在 EC2 键对后面的框中选择一个已有的键对,该键对用来在集群创建成功后,从 SSH 客户端登录到集群中的任意一台服务器。如果选择“在没有 EC2 键对的情况下继续”,则后续不能登录到集群中的机器。其余选项均可默认。然后点击创建集群。! 7. 修改安全组规则,并登录 EMR 的主节点
进入到刚刚创建的集群的信息界面,点击主节点安全组,进入到该安全组的配置界面,在入规则中增加 SSH 的访问规则,这样才可以通过 SSH 的方式从外部机器登录到主节点。然后通过任意一个 SSH 客户端登录到主节点,目标地址是图中所示的主节点共有 DNS, 用户名是 hadoop, 通过私钥登录,私钥与前面所提到的键对对应。
!
第二步,创建数据表并进行查询
1. SSH 到主节点后,执行 hive 命令,进入到 hive 命令行界面
!
2. 创建一个用日期作为分区的 hive 表,用来作为最终被查询的表
将以下脚本 copy 到 hive> 提示符下执行,注意 LOCATION 的参数需要改成你自己的目录。
`CREATE TABLE IF NOT EXISTS cloudfrontlogpart (`
`time STRING, xedgelocation STRING, scbytes INT, cip STRING, csmethod STRING, csHost STRING, csuristem STRING, scstatus INT, csReferer STRING, csUserAgent STRING, csuriquery STRING, csCookie STRING, xedgeresulttype STRING, xedgerequestid STRING, xhostheader STRING, csprotocol STRING, csbytes STRING, timetaken STRING, xforwardedfor STRING, sslprotocol STRING, sslcipher STRING, xedgeresponseresulttype STRING`
`)`
`PARTITIONED BY (datee Date)`
`STORED AS PARQUET`
`LOCATION 's3://testcloudfrontlog/logpart';`
3. 输入 quit 命令,退出 hive。用 aws s3 命令将 s3://testcloudfrontlog/log 中日志 copy 到 s3://testcloudfrontlog/logbydate 中按照时间划分的目录下
2016-07-11 这一天的文件为例,命令如下,注意,s3 目录需要改成你自己的。
`aws s3 cp s3://testcloudfrontlog/log/ s3://testcloudfrontlog/logbydate/2016-07-11/ --exclude "*" --include "*.2016-07-11*" --recursive`
这里用到了 aws s3 命令行工具。你可以在 EMR 主节点中退出 hive 命令行程序,然后执行以上命令。或者在任意一个安装了 aws cli 工具并配置了 s3 访问权限的机器中执行。本例中直接在 EMR 的主节点中执行。aws s3 cp 不支持通配符,所以用–exclude 和 –include 参数来代替。
4. 针对 s3://testcloudfrontlog/logbydate/2016-07-11/ 中的数据,创建一个 HIVE 表。
假设表的名字是 cloudfrontlog20160711, 输入 hive, 重新进入到 hive 命令行工具,并输入以下语句,注意 LOCATION 的参数需要改成你自己的目录。
`CREATE EXTERNAL TABLE IF NOT EXISTS cloudfrontlog20160711 (`
`date1 Date, time STRING, xedgelocation STRING, scbytes INT, cip STRING, csmethod STRING, csHost STRING, csuristem STRING, scstatus INT, csReferer STRING, csUserAgent STRING, csuriquery STRING, csCookie STRING, xedgeresulttype STRING, xedgerequestid STRING, xhostheader STRING, csprotocol STRING, csbytes STRING, timetaken STRING, xforwardedfor STRING, sslprotocol STRING, sslcipher STRING, xedgeresponseresulttype STRING`
`)`
`ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'`
`LOCATION 's3://testcloudfrontlog/logbydate/2016-07-11/'`
`tblproperties("skip.header.line.count"="2");`
5. 数据注入
至此,已经创建了两个 hive 表,通过在 hive 命令行工具中执行 show tables 命令,可以查看到两个 hive 表。
!
向带分区的 hive 表中注入 711 日的数据,在 hive 命令行界面中分别执行以下命令:
`set hive.exec.dynamic.partition.mode=nonstrict;`
`INSERT INTO TABLE cloudfrontlogpart PARTITION (datee)`
`SELECT time, xedgelocation, scbytes, cip, csmethod, csHost, csuristem, scstatus, csReferer, csUserAgent, csuriquery, csCookie, xedgeresulttype, xedgerequestid, xhostheader, csprotocol, csbytes, timetaken, xforwardedfor, sslprotocol, sslcipher, xedgeresponseresulttype, date1`
`FROM cloudfrontlog20160711;`
以下是 INSERT 语句执行过程的显示信息。
!
完成后,如果进入到 s3://testcloudfrontlog/logpart 目录下,可以查看到已经生成了按日期划分的目录,目录下存储了文件。
第三步,数据查询
可以继续使用 hive 做查询,也可以进入 presto 做查询。这里,我们使用 presto, 由于 presto 完全使用内存进行计算, 速度更快。进入 presto 的方式如下:
执行 quit, 退出 hive 命令行程序。
然后执行以下语句进入 presto 命令行界面:
`presto-cli --catalog hive --schema default`
该语句表示 presto 使用 hive 数据源,并且使用 hive 数据源中的 default 数据库。细节请参考 presto 的社区文档。
!
执行一个简单的查询:
`SELECT time, scbytes, cip FROM cloudfrontlogpart WHERE CAST(datee AS varchar) = CAST('2016-07-11' AS varchar) LIMIT 5;`
!
然后退出 presto 命令行工具, 输入 quit;
第四步,删除 EMR 集群
在集群列表中,选中刚刚创建的集群,点击终止,以终止该集群。如果开启的终止保护,需要变更一下终止保护的状态,然后再终止。
!
至此, 我们通过手动的方式完成了一个简单的数据查询。但在实际生产环境中,使用手动的方式会耗费很长时间做重复性的工作,并难免出错。EMR 更大优势是能够通过程序的方式去控制集群的创建以及任务的执行,这使得 EMR 的使用者能够将集群创建以及数据分析的过程自动化。
接下来的部分将以相同的示例指导读者一步步的实现自动化的创建集群、分析数据、转存结果、关闭集群。
** 自动方式完成交互式数据查询 **
首先概括几点自动化执行数据分析的需求:
1. 集群在每天的固定时间被创建,然后对数据进行分析,然后集群被自动删除。这显然不能通过图形界面进行一步步的操作了。
2. 在手动操作中执行的每个步骤,需要按顺序自动化执行。这些步骤包括:
– 从 /log 目录向 /logbydate 目录中 copy 特定日期的文件。
– 创建对应特定日期的 hive 表, 例如表名为 cloudfrontlog20160711。
– 从 cloudfrontlog20160711 向 cloudfrontlogpart 表中注入数据。
– 使用 presto 从 cloudfrontlogpart 表中查询出需要的数据。
– 此外,在手动执行的最后一步,我们可以直接看到查询结果,但在自动化执行的过程中,我们需要将查询结果存储到一个长期运行的数据库中,供随时查询。
3. 将 hive 元数据放在集群的外部。
在手动执行的流程中,创建 hive 表后,hive 的元数据存储在了主节点。而在自动化执行的过程中,当所有任务执行完毕后,集群被删除,存储在主节点的元数据也会被删除,因此要在外部数据库中存储 hive 的元数据。
针对以上的几个需求,在 EMR 中对应的解决方法如下.
1. 使用 EMR 的命令行进行各种集群的操作,例如集群的创建,参数的设置等。
2. 使用 EMR 的“step”来组织各个任务的执行。
3. 创建一个外部的数据库用来存储 hive 元数据,并在集群创建的时候指定元数据的存储位置。
接下来详细描述操作步骤和脚本
第一步,准备工作
1. 准备两个 mysql 数据库分别用来存储 hive 元数据和查询结果,可以使用 AWS 的 RDS 服务来创建。这两个数据库都需要能被 EMR 访问到, 这两个数据库也可以使用同一个物理服务器或虚拟机。
2. 假设存储查询结果的数据库名字是 loganalydb,我们在该数据库中创建一个表,用来存储结果数据,表的名字是 loganalytb。根据后面所进行的查询, 使用如下语句创建数据库以及与查询结果匹配的表:
`CREATE DATABASE loganalydb;`
`USE loganalydb;`
``CREATE TABLE `loganalytb` ( `id` int(11) NOT NULL AUTO_INCREMENT, `filepath` varchar(300) DEFAULT NULL, `totalbyte` bigint(20) DEFAULT NULL, `tdate` varchar(50) DEFAULT NULL, PRIMARY KEY (`id`)) ENGINE=InnoDB AUTO_INCREMENT=58147 DEFAULT CHARSET=utf8;``
对于存储 hive 元数据的数据库,暂时不必创建,从后面提到的 emrconf.json 文件可以看到,只需要给出数据库服务器的地址,以及用户名和密码即可,数据库不存在的话,会自动创建。
3. 由于自动化的流程与前面提到的手动流程使用相同的示例,因此,请事先删除手动流程示例中产生的数据。包括:
1) s3://testcloudfrontlog/logbydate 目录下的数据
2) s3://testcloudfrontlog/logpart 目录下的数据
第二步,编写脚本
下面给出各个脚本,并配以注释解释
1. 主程序脚本 loganaly.sh
`#!/bin/bash`
`# 变量 KEYNAMES 是你的 SSH key 的名字, 用来通过 SSH 方式访问 EC2。`
`KEYNAME=yourkeyname`
`# 变量 CONFIGFILE 是 emrconf.json 的 url 地址,emrconf.json 这个文件中包含了存储 hive 元数据的外部数据库的信息。将该文件存在 s3 中并让这个文件能够从外部访问到。emrconf.json 中的内容在下文中会给出。`
`CONFIGFILE=https://s3-us-west-2.amazonaws.com/testcloudfrontlog/conf/emrconf.json`
`# EMR 集群产生的日志所存放的 s3 目录。`
`LOGURI=s3://testcloudfrontlog/emrlog/`
`# 脚本、配置文件、jar 包所在的目录`
`CODEDIR=s3://testcloudfrontlog/conf`
`# cloudfront 日志的存放位置, 结尾别加 /`
`LOGSOURCEDIR=s3://testcloudfrontlog/log`
`# 当天日志的中转目录, 结尾别加 /`
`STAGINGDIR=s3://testcloudfrontlog/logbydate`
`# 昨天的日期, 形如:20160711`
`DATE=$(date -d "yesterday" +%Y%m%d)`
`# 昨天的日期,另外一种格式, 形如:2016-07-11`
`DATEE=$(date -d "yesterday" +%Y-%m-%d)`
`# 被查询的 hive 表的文件存储位置, 结尾别加 /`
`PARTDIR=s3://testcloudfrontlog/logpart`
`# 用来存储结果文件的目录, 结果文件将被 sqoop 使用,数据会被传到 mysql.`
`SQOOPFILE=s3://testcloudfrontlog/sqoopfile`
`#用来存储结果数据的 mysql 的信息。`
`DBHOST=rdsinstance.xxxxxxx.us-west-2.rds.amazonaws.com`
`JDBCURL=jdbc:mysql://rdsinstance.xxxxxxx.us-west-2.rds.amazonaws.com/loganalydb`
`DBUSER=username`
`DBPASS=password`
`# EMR 集群的配置信息, 这里是以 Oregan region 为例。如果使用的是北京 region,AWSREGION 用 cn-north-1。`
`AWSREGION=us-west-2`
`MASTERTYPE=m3.xlarge`
`CORETYPE=r3.xlarge`
`TASKTYPE=r3.xlarge`
`MASTERNUM=1`
`CORENUM=1`
`TASKNUM=1`
`# 删除当天的数据,目的是防止脚本在同一天被多次执行而造成数据冗余,`
`aws s3 rm $PARTDIR/datee=$DATEE --recursive`
`aws s3 rm $SQOOPFILE/$DATE --recursive`
`mysql -h$DBHOST -u$DBUSER -p$DBPASS --execute "DELETE FROM loganalydb.loganalytb WHERE tdate='$DATEE'"`
`# 创建 emr 集群, 名字是 loganaly, 其中:`
`# --auto-terminate 参数表示该集群在执行完所有的任务后自动删除。`
`# --configurations 参数的文件中的参数配置覆盖了该集群运行起来后的默认参数配置。在本例中用来修改 Hive 元数据的存储位置。`
`# --step 参数规定了该集群在创建后要执行的几个任务,其中 Type=Hive 的 step, 需要给出包含 hive 语句的文件作为参数。而 Type=CUSTOM_JAR 的 step, 需要给出一个 JAR 包,这里我们使用 EMR 提供 scrip-runner.jar, 它的作用是执行其第一个参数中指定的脚本文件,并将其余的参数作为脚本文件的输入参数。`
`# --instance groups 参数规定了集群的中各节点的机型和数量`
`aws --region $AWSREGION emr create-cluster --name "loganaly" --release-label emr-5.0.0 \`
{1}
`--applications Name=Hadoop Name=Hive Name=Presto Name=Sqoop \`
{1}
`--use-default-roles \`
{1}
`--ec2-attributes KeyName=$KEYNAME \`
{1}
`--termination-protected \`
{1}
`--auto-terminate \`
{1}
`--configurations $CONFIGFILE \`
{1}
`--enable-debugging \`
{1}
`--log-uri $LOGURI \`
{1}
`--steps \`
{1}
`Type=CUSTOM_JAR,Name="cpjar",Jar=$CODEDIR/script-runner.jar,Args=["$CODEDIR/cpjar.sh"," $CODEDIR"] \`
{1}
`Type=CUSTOM_JAR,Name="log2staging",Jar=$CODEDIR/script-runner.jar,Args=["$CODEDIR/log2logbydate.sh","$LOGSOURCEDIR","$STAGINGDIR","$DATE","$DATEE"] \`
{1}
`Type=Hive,Name="HiveStep",Args=[-f,$CODEDIR/hivetables.q,-d,PARTDIRh=$PARTDIR,-d,STAGINGDIRh=$STAGINGDIR,-d,DATEh=$DATE] \`
{1}
`Type=CUSTOM_JAR,Name="Presto2s3",Jar=$CODEDIR/script-runner.jar,Args=["$CODEDIR/presto2s3.sh","$SQOOPFILE","$DATE","$DATEE"] \`
{1}
`Type=CUSTOM_JAR,Name="s3tomysql",Jar=$CODEDIR/script-runner.jar,Args=["$CODEDIR/s3tomysql.sh","$JDBCURL","$DBUSER","$DBPASS","$SQOOPFILE","$DATE"] \`
{1}
`--instance-groups \`
{1}
`Name=Master,InstanceGroupType=MASTER,InstanceType=$MASTERTYPE,InstanceCount=$MASTERNUM \`
{1}
`Name=Core,InstanceGroupType=CORE,InstanceType=$CORETYPE,InstanceCount=$CORENUM \`
{1}
`Name=Task,InstanceGroupType=TASK,InstanceType=$TASKTYPE,InstanceCount=$TASKNUM`
{1}
2. 准备 cpjar.sh 脚本
{1}
作用是将 mysql 的 JDBC 驱动包下载到本地的 Sqoop 目录下,sqoop 在将文件转存到数据库的时候会用到 JDBC 驱动。
{1}
在国外 region, 使用以下脚本:
{1}
`#!/bin/bash`
{1}
`CODEDIR=$1`
{1}
`sudo aws s3 cp $CODEDIR/mysql-connector-java-5.1.38-bin.jar /usr/lib/sqoop/lib/`
{1}
在北京 Region, 使用以下脚本:
{1}
`#!/bin/bash`
{1}
`CODEDIR=$1`
{1}
`sudo aws s3 cp $CODEDIR/mysql-connector-java-5.1.38-bin.jar /usr/lib/sqoop/lib/ --region cn-north-1`
{1}
3. 准备 log2logbydate.sh 脚本
{1}
做用是将原始日志文件 copy 到按天划分的目录下。
{1}
`#!/bin/bash`
{1}
`LOGSOURCEDIR=$1`
{1}
`STAGINGDIR=$2`
{1}
`DATE=$3`
{1}
`DATEE=$4`
{1}
`aws s3 cp $LOGSOURCEDIR/ $STAGINGDIR/$DATE/ --exclude "*" --include "*.$DATEE*" --recursive`
{1}
4. 准备 hivetables.q 脚本
{1}
作用包括:创建待查询的表和按天命名的临时表,将临时表中数据注入到待查询的表中,并删除临时表。
{1}
`CREATE TABLE IF NOT EXISTS cloudfrontlogpart (`
{1}
`time STRING, xedgelocation STRING, scbytes INT, cip STRING, csmethod STRING, csHost STRING, csuristem STRING, scstatus INT, csReferer STRING, csUserAgent STRING, csuriquery STRING, csCookie STRING, xedgeresulttype STRING, xedgerequestid STRING, xhostheader STRING, csprotocol STRING, csbytes STRING, timetaken STRING, xforwardedfor STRING, sslprotocol STRING, sslcipher STRING, xedgeresponseresulttype STRING`
{1}
`)`
{1}
`PARTITIONED BY (datee Date)`
{1}
`LOCATION '${PARTDIRh}';`
{1}
{1}
{1}
`CREATE EXTERNAL TABLE IF NOT EXISTS cloudfrontlog${DATEh} (`
{1}
`date1 Date, time STRING, xedgelocation STRING, scbytes INT, cip STRING, csmethod STRING, csHost STRING, csuristem STRING, scstatus INT, csReferer STRING, csUserAgent STRING, csuriquery STRING, csCookie STRING, xedgeresulttype STRING, xedgerequestid STRING, xhostheader STRING, csprotocol STRING, csbytes STRING, timetaken STRING, xforwardedfor STRING, sslprotocol STRING, sslcipher STRING, xedgeresponseresulttype STRING`
{1}
`)`
{1}
`ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'`
{1}
`LOCATION '${STAGINGDIRh}/${DATEh}/'`
{1}
`tblproperties("skip.header.line.count"="2");`
{1}
{1}
{1}
`--make dynamic insert available`
{1}
`set hive.exec.dynamic.partition.mode=nonstrict;`
{1}
{1}
{1}
`--insert data.`
{1}
`INSERT INTO TABLE cloudfrontlogpart PARTITION (datee)`
{1}
`SELECT time, xedgelocation, scbytes, cip, csmethod, csHost, csuristem, scstatus, csReferer, csUserAgent, csuriquery, csCookie, xedgeresulttype, xedgerequestid, xhostheader, csprotocol, csbytes, timetaken, xforwardedfor, sslprotocol, sslcipher, xedgeresponseresulttype, date1`
{1}
`FROM cloudfrontlog${DATEh};`
{1}
{1}
{1}
`--delete the staging table`
{1}
`DROP TABLE cloudfrontlogstaging${DATEh};`
{1}
5. 准备 presto2s3.sh 脚本
{1}
作用包括: 查询 hive 表,并将结果存成.csv 格式的文件,将该文件上传到 S3 中相应的目录下。
{1}
`#!/bin/bash`
{1}
`TIME=$(date +%H%M%S)`
{1}
`SQOOPFILE=$1`
{1}
`DATE=$2`
{1}
`DATEE=$3`
{1}
`sudo presto-cli --catalog hive --schema default --execute "SELECT NULL as id, csuristem, SUM(scbytes) as totalbyte, CAST('$DATEE' AS varchar) as date FROM cloudfrontlogpart WHERE CAST(datee AS varchar) = CAST('$DATEE' AS varchar) GROUP BY csuristem order by totalbyte desc" --output-format CSV > ~/cdnfilestat.csv`
{1}
`aws s3 cp ~/cdnfilestat.csv $SQOOPFILE/$DATE/cdnfilestat$TIME.csv`
{1}
6. 准备 s3tomysql.sh 脚本
{1}
作用是利用 sqoop 将 s3 中的.csv 文件中的内容转存到数据库中。注意,如果单次转存的数据量大,你可能需要调大数据库的 max_allowed_packet 参数。
{1}
`#!/bin/bash`
{1}
`JDBCURL=$1`
{1}
`DBUSER=$2`
{1}
`DBPASS=$3`
{1}
`SQOOPFILE=$4`
{1}
`DATE=$5`
{1}
`sqoop export --connect $JDBCURL --username $DBUSER --password $DBPASS --table loganalytb --fields-terminated-by ',' --enclosed-by '\"' --export-dir $SQOOPFILE/$DATE/`
{1}
7. 准备 emrconf.json 脚本
{1}
作用是使得创建起来的 EMR 集群的 hive 元数据存储在外部数据库中。注意: 根据准备工作中创建的数据库来修改文件中的 ConnectionUserName、ConnectionPassword、ConnectionURL 三个参数。
{1}
`[`
{1}
`{"Classification":"hive-site",`
{1}
`"Properties":{`
{1}
`"javax.jdo.option.ConnectionUserName":"username",`
{1}
`"javax.jdo.option.ConnectionDriverName":"org.mariadb.jdbc.Driver",`
{1}
`"javax.jdo.option.ConnectionPassword":"password",`
{1}
`"javax.jdo.option.ConnectionURL":"jdbc:mysql://rdsinstance.xxxxxxxxxx.us-west-2.rds.amazonaws.com:3306/hive?createDatabaseIfNotExist=true"`
{1}
`},`
{1}
`"Configurations":[]`
{1}
`}`
{1}
`]`
{1}
注意:如果是在北京 Region, emrconf.json 使用以下脚本
{1}
`[`
{1}
`{"Classification":"hive-site",`
{1}
`"Properties":{`
{1}
`"javax.jdo.option.ConnectionUserName":"username",`
{1}
`"javax.jdo.option.ConnectionDriverName":"org.mariadb.jdbc.Driver",`
{1}
`"javax.jdo.option.ConnectionPassword":"password",`
{1}
`"javax.jdo.option.ConnectionURL":"jdbc:mysql://rdsinstance.xxxxxxxx.us-west-2.rds.amazonaws.com:3306/hive?createDatabaseIfNotExist=true"`
{1}
`},`
{1}
`"Configurations":[]`
{1}
`},`
{1}
`{"Classification":"presto-connector-hive",`
{1}
`"Properties":{`
{1}
`"hive.s3.pin-client-to-current-region":"true"`
{1}
`},`
{1}
`"Configurations":[]`
{1}
`}`
{1}
`]`
{1}
8. 准备 jar 包
{1}
需要两个 jar,一个是 script-runner.jar,下载地址: http://s3.amazonaws.com/elasticmapreduce/libs/script-runner/script-runner.jar
{1}
另一个是 mysql 的 JDBC 驱动,下载地址: https://s3-us-west-2.amazonaws.com/hxyshare/mysql-connector-java-5.1.38-bin.jar
{1}
9. 上传文件
{1}
在 s3 中创建 s3://testcloudfrontlog/conf/ 目录,并将第 8 步中的两个 jar 包,以及 cpjar.sh,log2staging.sh,hivetables.q,presto2s3.sh,s3tomysql.sh,emrconf.json, 上传到该目录。
{1}
由于 emrconf.json 需要通过 http 的方式访问到, 在 s3 中将 emrconf.json 的访问权限增加“所有人可下载”。
{1}
cloudfront 日志文件 copy 到 s3://testcloudfrontlog/log 目录下, 两个示例文件下载地址:
{1}
https://s3-us-west-2.amazonaws.com/hxyshare/cloudfrontlog/E36NLFLFEN3X0H.2016-07-11-02+.36b1a433.gz
{1}
https://s3-us-west-2.amazonaws.com/hxyshare/cloudfrontlog/E36NLFLFEN3X0H.2016-07-12-21.a92ddc55.gz
{1}
如果在手动流程中已经上传了日志文件,则不必再上传。
{1}
第三步,执行程序
{1}
暂时将主程序 loganaly.sh 中的 DATE 和 DATEE 参数修改为示例数据的时间,例如, 分别写成 20160711 和 2016-07-11,在任意一个 Linux 系统中运行主程序脚本 loganaly.sh(例如,可以使用 EC2 实例)。但需注意:
{1}
1) 该机器需要安装了 AWS 命令行工具,并具有 s3 和 EMR 的操作权限。
{1}
2) 该机器能访问到存储数据结果的数据库, 因为 loganaly.sh 中有对该数据库的操作。
{1}
集群创建成功后,自动执行每个 step 中的任务,所有任务执行完成后自动关闭,从下图中可以看到每个 Step 的执行:
{1}
!{1}
{1}
所有任务执行完成后,进入到存储查询结果的数据库,查看输入的结果:
{1}
!{1}
{1}
如果要想每天执行 loganaly.sh 脚本并对前一天的数据进行处理和分析,将 loganaly.sh 中的 DATE 和 DATEE 分别赋值为 $(date -d “yesterday” +%Y%m%d) 和 $(date -d “yesterday” +%Y-%m-%d),然后创建一个 crontab 定时任务,每天定时执行 loganaly.sh.
{1}
如果是在 AWS 中国以外的 region 执行,还可以利用竞价实例来大幅的降低成本,使用竞价实例的方法也非常简单,只需要在将 loganaly.sh 中对创建 EMR 集群的脚本稍做修改,增加 BidPrice 参数:
{1}
`--instance-groups \`
{1}
`Name=Master,InstanceGroupType=MASTER,InstanceType=$MASTERTYPE,BidPrice=0.2,InstanceCount=$MASTERNUM \`
{1}
`Name=Core,InstanceGroupType=CORE,InstanceType=$CORETYPE,BidPrice=0.2,InstanceCount=$CORENUM \`
{1}
`Name=Task,InstanceGroupType=TASK,InstanceType=$TASKTYPE,BidPrice=0.2,InstanceCount=$TASKNUM`
{1}
第四步,删除资源
{1}
为避免额外花费,删除本实验过程中 (包括手动过程以及自动过程中) 创建的资源,S3, EC2 等资源。
{1}
#### 总结
{1}
本文中使用 Cloudfront 日志进行分析,但本文中使用的方法稍作修改便适用于其他类型的日志类文件的分析。本文中主要使用了 EMR 中的 Hive,Presto,Sqoop 工具,但 EMR 还有更多的工具 (例如 Spark) 可供用户使用,用户在创建集群的时候增加相应的服务即可实现丰富的功能。
{1}
** 作者介绍:**
{1}
!{1}
{1}
韩小勇
{1}
AWS 解决方案架构师,负责基于 AWS 的云计算方案架构咨询和设计,实施和推广,在加入 AWS 之前,从事电信核心网系统上云的方案设计及标准化推广 。
{1}

本文转载自 AWS 技术博客。

原文链接:
https://amazonaws-china.com/cn/blogs/china/amazon-emr/

欲了解 AWS 的更多信息,请访问【AWS 技术专区】

评论

发布