用实例讲解 Spark Sreaming

阅读数:8617 2016 年 5 月 12 日 02:56

本篇文章用 Spark Streaming +Hbase 为列,Spark Streaming 专为流式数据处理,对 Spark 核心 API 进行了相应的扩展。

首先,什么是流式处理呢?数据流是一个数据持续不断到达的无边界序列集。流式处理是把连续不断的数据输入分割成单元数据块来处理。流式处理是一个低延迟的处理和流式数据分析。Spark Streaming 对 Spark 核心 API 进行了相应的扩展,支持高吞吐、低延迟、可扩展的流式数据处理。实时数据处理应用的场景有下面几个:

  • 网站监控和网络监控;
  • 异常监测;
  • 网页点击;
  • 广告数据;

物联网(IOT)

用实例讲解Spark Sreaming

图 1

Spark Streaming 支持的数据源包括 HDFS 文件,TCP socket,Kafka,Flume,Twitter 等,数据流可以通过 Spark 核心 API、DataFrame SQL 或者机器学习 API 处理,并可以持久化到本地文件、HDFS、数据库或者其它任意支持 Hadoop 输出格式的形式。

Spark Streaming 以 X 秒(batch size)为时间间隔把数据流分割成 Dstream,组成一个 RDD 序列。你的 Spark 应用处理 RDD,并把处理的结果批量返回。

用实例讲解Spark Sreaming

图 2

用实例讲解Spark Sreaming

图 3

Spark Streaming 例子代码分下面几部分:

- 读取流式数据;

- 处理流式数据;

- 写处理结果倒 Hbase 表。

Spark 处理部分的代码涉及到如下内容:

  • 读取 Hbase 表的数据;
  • 按天计算数据统计;
  • 写统计结果到 Hbase 表,列簇:stats。

数据集来自油泵信号数据,以 CSV 格式存储在指定目录下。Spark Streaming 监控此目录,CSV 文件的格式如图 3。

用实例讲解Spark Sreaming

图 4

采用 Scala 的 case class 来定义数据表结构,parseSensor 函数解析逗号分隔的数据。

流式处理的 Hbase 表结构如下:

  • 油泵名字 + 日期 + 时间戳 组合成 row key;
  • 列簇是由输入数据列、报警数据列等组成,并设置过期时间。
  • 每天等统计数据表结构如下:
  • 油泵名和日期组成 row key;

列簇为 stats,包含列有最大值、最小值和平均值;

用实例讲解Spark Sreaming

图 5

配置写入 Hbase 表

Spark 直接用 TableOutputFormat 类写数据到 Hbase 里,跟在 MapReduce 中写数据到 Hbase 表一样,下面就直接用 TableOutputFormat 类了。

Spark Streaming 代码

Spark Streaming 的基本步骤:

  • 初始化 Spark StreamingContext 对象;
  • 在 DStream 上进行 transformation 操作和输出操作;
  • 开始接收数据并用 streamingContext.start();
  • 等待处理停止,streamingContext.awaitTermination()。

创建 StreamingContext 对象,StreamingContext 是 Spark Streaming 处理的入口,这里设置 2 秒的时间间隔。

复制代码
val sparkConf = new SparkConf().setAppName("HBaseStream")
// create a StreamingContext, the main entry point for all streaming functionality
val ssc = new StreamingContext(sparkConf, Seconds(2))

接下来用 StreamingContext 的 textFileStream(directory) 创建输入流跟踪 Hadoop 文件系统的新文件,并处理此目录下的所有文件,这里 directory 指文件目录。

复制代码
// create a DStream that represents streaming data from a directory source
val linesDStream = ssc.textFileStream("/user/user01/stream")

linesDStream 是数据流,每条记录是按行记录的 text 格式。

用实例讲解Spark Sreaming

图 6

接下来进行解析,对 linesDStream 进行 map 操作,map 操作是对 RDD 应用 Sensor.parseSensor 函数,返回 Sensor 的 RDD。

复制代码
// parse each line of data in linesDStream into sensor objects
val sensorDStream = linesDStream.map(Sensor.parseSensor)

用实例讲解Spark Sreaming

图 7

对 DStream 的每个 RDD 执行 foreachRDD 方法,使用 filter 过滤 Sensor 中低 psi 值来创建报警,使用 Hbase 的 Put 对象转换 sensor 和 alter 数据以便能写入到 Hbase。然后使用 PairRDDFunctions 的 saveAsHadoopDataset 方法将最终结果写入到任何 Hadoop 兼容到存储系统。

复制代码
// for each RDD. performs function on each RDD in DStream
sensorRDD.foreachRDD { rdd =>
// filter sensor data for low psi
val alertRDD = rdd.filter(sensor => sensor.psi < 5.0)
// convert sensor data to put object and write to HBase Table CF data
rdd.map(Sensor.convertToPut).saveAsHadoopDataset(jobConfig)
// convert alert to put object write to HBase Table CF alerts
rdd.map(Sensor.convertToPutAlert).saveAsHadoopDataset(jobConfig)
}

sensorRDD 经过 Put 对象转换,然后写入到 Hbase。

用实例讲解Spark Sreaming

图 8

通过 streamingContext.start() 显式的启动数据接收,然后调用 streamingContext.awaitTermination() 来等待计算完成。

复制代码
// Start the computation
ssc.start()
// Wait for the computation to terminate
ssc.awaitTermination()

现在开始读取 Hbase 的 sensor 表,计算每条的统计指标并把对应的数据写入 stats 列簇。

用实例讲解Spark Sreaming

图 9

下面的代码读取 Hbase 的 sensor 表 psi 列数据,用 StatCounter 计算统计数据,然后写入 stats 列簇。

复制代码
// configure HBase for reading
val conf = HBaseConfiguration.create()
conf.set(TableInputFormat.INPUT_TABLE, HBaseSensorStream.tableName)
// scan data column family psi column
conf.set(TableInputFormat.SCAN_COLUMNS, "data:psi")
// Load an RDD of (row key, row Result) tuples from the table
val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.hbase.client.Result])
// transform (row key, row Result) tuples into an RDD of Results
val resultRDD = hBaseRDD.map(tuple => tuple._2)
// transform into an RDD of (RowKey, ColumnValue)s , with Time removed from row key
val keyValueRDD = resultRDD.
map(result => (Bytes.toString(result.getRow()).
split(" ")(0), Bytes.toDouble(result.value)))
// group by rowkey , get statistics for column value
val keyStatsRDD = keyValueRDD.
groupByKey().
mapValues(list => StatCounter(list))
// convert rowkey, stats to put and write to hbase table stats column family
keyStatsRDD.map { case (k, v) => convertToPut(k, v) }.saveAsHadoopDataset(jobConfig)

下面的流程图显示 newAPIHadoopRDD 输出,(row key,result)的键值对。PairRDDFunctions 的 saveAsHadoopDataset 方法把 Put 对象存入到 Hbase。

用实例讲解Spark Sreaming

图 10

运行 Spark Streaming 应用跟运行 Spark 应用类似,比较简单,此处不赘述,参见 Spark Streaming 官方文档

收藏

评论

微博

用户头像
发表评论

注册/登录 InfoQ 发表评论