写点什么

用实例讲解 Spark Sreaming

2016 年 5 月 12 日

本篇文章用 Spark Streaming +Hbase 为列,Spark Streaming 专为流式数据处理,对 Spark 核心 API 进行了相应的扩展。

首先,什么是流式处理呢?数据流是一个数据持续不断到达的无边界序列集。流式处理是把连续不断的数据输入分割成单元数据块来处理。流式处理是一个低延迟的处理和流式数据分析。Spark Streaming 对 Spark 核心 API 进行了相应的扩展,支持高吞吐、低延迟、可扩展的流式数据处理。实时数据处理应用的场景有下面几个:

  • 网站监控和网络监控;
  • 异常监测;
  • 网页点击;
  • 广告数据;

物联网(IOT)

图 1

Spark Streaming 支持的数据源包括 HDFS 文件,TCP socket,Kafka,Flume,Twitter 等,数据流可以通过 Spark 核心 API、DataFrame SQL 或者机器学习 API 处理,并可以持久化到本地文件、HDFS、数据库或者其它任意支持 Hadoop 输出格式的形式。

Spark Streaming 以 X 秒(batch size)为时间间隔把数据流分割成 Dstream,组成一个 RDD 序列。你的 Spark 应用处理 RDD,并把处理的结果批量返回。

图 2


图 3

Spark Streaming 例子代码分下面几部分:
- 读取流式数据;
- 处理流式数据;
- 写处理结果倒 Hbase 表。

Spark 处理部分的代码涉及到如下内容:

  • 读取 Hbase 表的数据;
  • 按天计算数据统计;
  • 写统计结果到 Hbase 表,列簇:stats。

数据集来自油泵信号数据,以 CSV 格式存储在指定目录下。Spark Streaming 监控此目录,CSV 文件的格式如图 3。

图 4

采用 Scala 的 case class 来定义数据表结构,parseSensor 函数解析逗号分隔的数据。

流式处理的 Hbase 表结构如下:

  • 油泵名字 + 日期 + 时间戳 组合成 row key;
  • 列簇是由输入数据列、报警数据列等组成,并设置过期时间。
  • 每天等统计数据表结构如下:
  • 油泵名和日期组成 row key;

列簇为 stats,包含列有最大值、最小值和平均值;

图 5

配置写入 Hbase 表

Spark 直接用 TableOutputFormat 类写数据到 Hbase 里,跟在 MapReduce 中写数据到 Hbase 表一样,下面就直接用 TableOutputFormat 类了。

Spark Streaming 代码

Spark Streaming 的基本步骤:

  • 初始化 Spark StreamingContext 对象;
  • 在 DStream 上进行 transformation 操作和输出操作;
  • 开始接收数据并用 streamingContext.start();
  • 等待处理停止,streamingContext.awaitTermination()。

创建 StreamingContext 对象,StreamingContext 是 Spark Streaming 处理的入口,这里设置 2 秒的时间间隔。

复制代码
val sparkConf = new SparkConf().setAppName("HBaseStream")
// create a StreamingContext, the main entry point for all streaming functionality
val ssc = new StreamingContext(sparkConf, Seconds(2))

接下来用 StreamingContext 的 textFileStream(directory) 创建输入流跟踪 Hadoop 文件系统的新文件,并处理此目录下的所有文件,这里 directory 指文件目录。

复制代码
// create a DStream that represents streaming data from a directory source
val linesDStream = ssc.textFileStream("/user/user01/stream")

linesDStream 是数据流,每条记录是按行记录的 text 格式。

图 6

接下来进行解析,对 linesDStream 进行 map 操作,map 操作是对 RDD 应用 Sensor.parseSensor 函数,返回 Sensor 的 RDD。

复制代码
// parse each line of data in linesDStream into sensor objects
val sensorDStream = linesDStream.map(Sensor.parseSensor)


图 7

对 DStream 的每个 RDD 执行 foreachRDD 方法,使用 filter 过滤 Sensor 中低 psi 值来创建报警,使用 Hbase 的 Put 对象转换 sensor 和 alter 数据以便能写入到 Hbase。然后使用 PairRDDFunctions 的 saveAsHadoopDataset 方法将最终结果写入到任何 Hadoop 兼容到存储系统。

复制代码
// for each RDD. performs function on each RDD in DStream
sensorRDD.foreachRDD { rdd =>
// filter sensor data for low psi
val alertRDD = rdd.filter(sensor => sensor.psi < 5.0)
// convert sensor data to put object and write to HBase Table CF data
rdd.map(Sensor.convertToPut).saveAsHadoopDataset(jobConfig)
// convert alert to put object write to HBase Table CF alerts
rdd.map(Sensor.convertToPutAlert).saveAsHadoopDataset(jobConfig)
}

sensorRDD 经过 Put 对象转换,然后写入到 Hbase。

图 8

通过 streamingContext.start() 显式的启动数据接收,然后调用 streamingContext.awaitTermination() 来等待计算完成。

复制代码
// Start the computation
ssc.start()
// Wait for the computation to terminate
ssc.awaitTermination()

现在开始读取 Hbase 的 sensor 表,计算每条的统计指标并把对应的数据写入 stats 列簇。

图 9

下面的代码读取 Hbase 的 sensor 表 psi 列数据,用 StatCounter 计算统计数据,然后写入 stats 列簇。

复制代码
// configure HBase for reading
val conf = HBaseConfiguration.create()
conf.set(TableInputFormat.INPUT_TABLE, HBaseSensorStream.tableName)
// scan data column family psi column
conf.set(TableInputFormat.SCAN_COLUMNS, "data:psi")
// Load an RDD of (row key, row Result) tuples from the table
val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.hbase.client.Result])
// transform (row key, row Result) tuples into an RDD of Results
val resultRDD = hBaseRDD.map(tuple => tuple._2)
// transform into an RDD of (RowKey, ColumnValue)s , with Time removed from row key
val keyValueRDD = resultRDD.
map(result => (Bytes.toString(result.getRow()).
split(" ")(0), Bytes.toDouble(result.value)))
// group by rowkey , get statistics for column value
val keyStatsRDD = keyValueRDD.
groupByKey().
mapValues(list => StatCounter(list))
// convert rowkey, stats to put and write to hbase table stats column family
keyStatsRDD.map { case (k, v) => convertToPut(k, v) }.saveAsHadoopDataset(jobConfig)

下面的流程图显示 newAPIHadoopRDD 输出,(row key,result)的键值对。PairRDDFunctions 的 saveAsHadoopDataset 方法把 Put 对象存入到 Hbase。

图 10

运行 Spark Streaming 应用跟运行 Spark 应用类似,比较简单,此处不赘述,参见 Spark Streaming 官方文档

2016 年 5 月 12 日 02:568971
用户头像

发布了 43 篇内容, 共 23.9 次阅读, 收获喜欢 4 次。

关注

评论

发布
暂无评论
发现更多内容

自由职业半年之后,我又滚回职场了...

王磊

程序员 程序员人生 程序人生

不懂什么是高并发?看完这篇文章你可以去吊打面试官了

互联网架构师小马

高可用 高并发 高性能 高并发系统设计 多线程与高并发

选择排序

wjchenge

2.3万个MongoDB数据库遭黑客比特币勒索,你中招了吗?中招怎么办?

墨天轮

比特币 数据库 oracle mongodb 黑客

太阳马戏团在疫情下的组合式创新

石云升

商业模式 组合式创新 思想实验

计算机操作系统基础(十)---存储管理之虚拟内存

书旅

php laravel 线程 操作系统 进程

区块链正处于手脚并用攀爬的“攻坚时刻”

CECBC区块链专委会

数据上链 市场选择

大数学家笛卡尔到底是怎么死的? |《隐秘的角落》

赵新龙

数学 隐秘的角落 笛卡尔

公司短信平台上的两万块钱,瞬间就被刷没了

古时的风筝

短信防刷 接口安全 短信轰炸机

猿灯塔:最详细Dubbo相关面试题

猿灯塔

高效程序员的七个好习惯——你有吗?

小谈

JVM Java 面试 springboot 程序员素养 SpringCloud

开篇词——你所不知道的神经网络攻防

P小二

神经网络 AIPwn 对抗样本 AI安全 P小二

十分钟带你彻底搞懂原码、反码、补码

程序员生活志

补码 原码 反码

谁没个焦虑的时段呢?

封不羁

个人成长 个人感想 程序员成长

第五周作业

武鹏

ThreadPoolExecutor 线程池使用

郭儿的跋涉

线程 多线程 线程池

四面阿里巴巴回来分享面经总结,定级P7架构师

小吴选手

架构 技术 Spring Boot 阿里 Java 面试

最详细的Java/后端学习路线

犬来八荒

Java开发连Redis都不会还想跳槽涨薪?先把Redis的知识点吃透再说

互联网架构师小马

数据库 redis 缓存 面试 找工作

数据集永久下架,微软不是第一个,MIT 也不是最后一个

神经星星

AI 计算机视觉 MIT AI 伦理 数据集

架构师训练营 - 第五课作业 -20200708- 一致性HASH

👑👑merlan

极客大学架构师训练营 一致性哈希

蟒周刊/427:机器狗已在公开发售,支持用 Python 对其编程...

ZoomQuiet大妈

Python 大妈 蟒营® 蟒周刊 101camp

2020年7月国产数据库排行:华为、腾讯发新品,中兴、阿里结硕果

墨天轮

数据库 阿里 排行榜

nightingale安装详解

曾祥斌

​ “强大基座”再展能力,一朵“云”掀起国产化浪潮

Geek_116789

业务学习-美团闪购

第519区

数据产品经理的具象化

松子(李博源)

大数据 产品经理 数据产品

了不起的 Webpack 构建流程学习指南

pingan8787

Java 前端 Web webpack

宅家复习一个月,成功入职腾讯,才知道算法实在太太太重要了

互联网架构师小马

程序员 腾讯 面试 算法 找工作

了不起的 tsconfig.json 学习指南

pingan8787

typescript 前端 Web

阿里大型企业级开发必用微服务:深入浅出SpringBoot2.x

小闫

spring jdk 后端 Java 面试 springboot

InfoQ 极客传媒开发者生态共创计划线上发布会

InfoQ 极客传媒开发者生态共创计划线上发布会

用实例讲解Spark Sreaming-InfoQ