写点什么

使用 Spark Streaming 进行情感分析

  • 2016-05-24
  • 本文字数:2054 字

    阅读完需:约 7 分钟

这里将使用 Twitter 流式数据,它符合所有所需:持续而且无止境的数据源。

Spark Streaming

Spark Streaming 在电子书《手把手教你学习Spark》第六章有详细介绍,这里略过Streaming API 的详细介绍,直接进行程序开发 。

程序开发设置部分

程序开发起始部分需要做好准备工作。

复制代码
val config = new SparkConf().setAppName("twitter-stream-sentiment")
val sc = new SparkContext(config)
sc.setLogLevel("WARN")
val ssc = new StreamingContext(sc, Seconds(5))
System.setProperty("twitter4j.oauth.consumerKey", "consumerKey")
System.setProperty("twitter4j.oauth.consumerSecret", "consumerSecret")
System.setProperty("twitter4j.oauth.accessToken", accessToken)
System.setProperty("twitter4j.oauth.accessTokenSecret", "accessTokenSecret")
val stream = TwitterUtils.createStream(ssc, None)

这里创建一个 Spark Context sc,设置日志级别为 WARN 来消除 Spark 生成的日志。使用sc创建 Streaming Contextssc,然后设置 Twitter 证书来获得 Twitter 网站数据。

Twitter 上现在的趋势是什么?

很容易的能够找到任意给定时刻的 Twitter 趋势,仅仅需要计算数据流每个标签的数目。让我们看下 Spark 如何实现这个操作的。

复制代码
val tags = stream.flatMap { status =>
status.getHashtagEntities.map(_.getText)
}
tags.countByValue()
.foreachRDD { rdd =>
val now = org.joda.time.DateTime.now()
rdd
.sortBy(_._2)
.map(x => (x, now))
.saveAsTextFile(s"~/twitter/$now")
}

首先从 Tweets 获取标记,并计算标记的数量,按数量排序,然后持久化结果。我们基于前面的结果建立一个监控面板来跟踪趋势标签。作者的同事就可以创建一个广告标记(campaigns),并吸引更多的用户。

分析 Tweets

现在我们想增加一个功能来获得用户主要感兴趣的主题集。为了这个目的我们想对 Tweets 的大数据和食物两个不相关的主题进行情感分析。

有几种 API 可以在 Tweets 上做情感分析,但是作者选择斯坦福自然语言处理组开发的库来抽取相关情感。
build.sbt文件中增加相对应的依赖。

复制代码
libraryDependencies += "edu.stanford.nlp" % "stanford-corenlp" % "3.5.1"
libraryDependencies += "edu.stanford.nlp" % "stanford-corenlp" % "3.5.1" classifier "models"

现在,我们通过 Streaming 过滤一定的哈希标签,只选择感兴趣的 Tweets,如下所示:

复制代码
val tweets = stream.filter {t =>
val tags = t.getText.split(" ").filter(_.startsWith("#")).map(_.toLowerCase)
tags.contains("#bigdata") && tags.contains("#food")
}

得到 Tweets 上所有标签,然后标记出#bigdata 和 #food 两个标签。
接下来定一个函数从 Tweets 抽取相关的情感:

def detectSentiment(message: String): SENTIMENT_TYPE然后对 detectSentiment 进行测试以确保其可以工作:

复制代码
it("should detect not understood sentiment") {
detectSentiment("") should equal (NOT_UNDERSTOOD)
}
it("should detect a negative sentiment") {
detectSentiment("I am feeling very sad and frustrated.") should equal (NEGATIVE)
}
it("should detect a neutral sentiment") {
detectSentiment("I'm watching a movie") should equal (NEUTRAL)
}
it("should detect a positive sentiment") {
detectSentiment("It was a nice experience.") should equal (POSITIVE)
}
it("should detect a very positive sentiment") {
detectSentiment("It was a very nice experience.") should equal (VERY_POSITIVE)
}

完整列子如下:

复制代码
val data = tweets.map { status =>
val sentiment = SentimentAnalysisUtils.detectSentiment(status.getText)
val tags = status.getHashtagEntities.map(_.getText.toLowerCase)
(status.getText, sentiment.toString, tags)
}

data 中包含相关的情感。

和 SQL 协同进行分析

现在作者想把情感分析的数据存储在外部数据库,为了后续可以使用 SQL 查询。
具体操作如下:

复制代码
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
data.foreachRDD { rdd =>
rdd.toDF().registerTempTable("sentiments")
}

将 Dstream 转换成 DataFrame,然后注册成一个临时表,其他喜欢使用 SQL 的同事就可以使用不同的数据源啦。

sentiment 表可以被任意查询,也可以使用 Spark SQL 和其他数据源(比如,Cassandra 数据等)进行交叉查询。
查询 DataFrame 的列子:

sqlContext.sql("select * from sentiments").show()## 窗口操作

Spark Streaming 的窗口操作可以进行回溯数据,这在其他流式引擎中并没有。
为了使用窗口函数,你需要 checkpoint 流数据,具体详情见 http://spark.apache.org/docs/latest/streaming-programming-guide.html#checkpointing
简单的一个窗口操作:

复制代码
tags
.window(Minutes(1))
. (...)

结论

此列子虽然简单,但是其可以使用 Spark 解决实际问题。我们可以计算 Twitter 上主题趋势。

2016-05-24 17:455480
用户头像

发布了 43 篇内容, 共 30.9 次阅读, 收获喜欢 7 次。

关注

评论

发布
暂无评论
发现更多内容

全面解读 SQL 优化 - 统计信息

KaiwuDB

sql 优化 KaiwuDB

KubeCon China 2023 | 华为ICT开源产业与生态发展团队参会精彩回顾

科技热闻

得物API元数据中心探索与思考

得物技术

API管理 元数据中心 自动解析

Spring Boot 中常见且必备的注解解析

Liam

Java 程序员 Spring Boot 后端 注解

基于Java开发的企业人力资源管理系统(招聘、绩效、考勤、酬薪管理)

金陵老街

长沙企业采购云管平台选哪家厂商?联系电话多少?

行云管家

云计算 云服务 云管理平台 云管平台

供应链透明度的代币开发

区块链软件开发推广运营

交易所开发 dapp开发 区块链开发 NFT开发

阿里云易立:以云原生之力,实现大模型时代基础设施能力跃升 | KubeCon 主论坛分享

阿里巴巴云原生

阿里云 容器 云原生 KubeCON

支持信创系统的数据库审计系统有哪些?用哪家好?

行云管家

数据库 信创 数据安全 国产化 数据库审计

如何利用Vert.x快速开发你的应用

Kevin_913

JAVA OOM异常最佳实践

Yestodorrow

Java 运维 监控 可观测性 系统性能

一文带你走进 Linux 小工具 - tmux

KaiwuDB

Linux tmux KaiwuDB

实用帖|打破常规,巧用分层地毯的8种方法!

Finovy Cloud

C4D

三个要点,掌握Spring Boot单元测试

互联网工科生

Spring Boot 分层架构 单元测试 JUnit Mockito

什么是API商品数据接口?该怎么使用?

Noah

API 安全 API 文档 API 开发

EOS系统合约链账户介绍

BSN研习社

深圳华秋电子有限公司与共熵服务中心缔结战略合作伙伴关系

华秋电子

合作伙伴

<em> 和 <strong> 标签的区别

Lee Chen

html 前端

语音识别技术的现状及发展趋势

数据堂

一种通过延迟事务提升数据库性能的方法

天翼云开发者社区

数据库

语音识别技术:从离线到在线的转变

数据堂

华秋DFM新功能丨可焊性检查再次升级,抢先体验!

华秋电子

前端首屏优化 | 提升首屏的 8 个很简单的手段

Yestodorrow

可观测性 网站性能

与创新者同行!Doris Summit Asia 2023 完整议程公开,首届线下峰会邀你报名!

SelectDB

大数据 数据分析 Doris 峰会 数据库、

首单立减7元华为负一屏买电影票又便宜又快

最新动态

[分组聚合]基于Lucene8进行多值字段分组聚合(多属性字段)

alexgaoyh

lucene Spring Boot 分组聚合 单字段 多属性

如何用装饰者模式代理final方法

程序员万金游

spring aop #java #程序员 #Spring #后端

一种提升SQL改写效率的方法

天翼云开发者社区

数据库

使用Spark Streaming进行情感分析_语言 & 开发_侠天_InfoQ精选文章