阿里云「飞天发布时刻」2024来啦!新产品、新特性、新能力、新方案,等你来探~ 了解详情
写点什么

使用 Spark Streaming 进行情感分析

  • 2016-05-24
  • 本文字数:2054 字

    阅读完需:约 7 分钟

这里将使用 Twitter 流式数据,它符合所有所需:持续而且无止境的数据源。

Spark Streaming

Spark Streaming 在电子书《手把手教你学习Spark》第六章有详细介绍,这里略过Streaming API 的详细介绍,直接进行程序开发 。

程序开发设置部分

程序开发起始部分需要做好准备工作。

复制代码
val config = new SparkConf().setAppName("twitter-stream-sentiment")
val sc = new SparkContext(config)
sc.setLogLevel("WARN")
val ssc = new StreamingContext(sc, Seconds(5))
System.setProperty("twitter4j.oauth.consumerKey", "consumerKey")
System.setProperty("twitter4j.oauth.consumerSecret", "consumerSecret")
System.setProperty("twitter4j.oauth.accessToken", accessToken)
System.setProperty("twitter4j.oauth.accessTokenSecret", "accessTokenSecret")
val stream = TwitterUtils.createStream(ssc, None)

这里创建一个 Spark Context sc,设置日志级别为 WARN 来消除 Spark 生成的日志。使用sc创建 Streaming Contextssc,然后设置 Twitter 证书来获得 Twitter 网站数据。

Twitter 上现在的趋势是什么?

很容易的能够找到任意给定时刻的 Twitter 趋势,仅仅需要计算数据流每个标签的数目。让我们看下 Spark 如何实现这个操作的。

复制代码
val tags = stream.flatMap { status =>
status.getHashtagEntities.map(_.getText)
}
tags.countByValue()
.foreachRDD { rdd =>
val now = org.joda.time.DateTime.now()
rdd
.sortBy(_._2)
.map(x => (x, now))
.saveAsTextFile(s"~/twitter/$now")
}

首先从 Tweets 获取标记,并计算标记的数量,按数量排序,然后持久化结果。我们基于前面的结果建立一个监控面板来跟踪趋势标签。作者的同事就可以创建一个广告标记(campaigns),并吸引更多的用户。

分析 Tweets

现在我们想增加一个功能来获得用户主要感兴趣的主题集。为了这个目的我们想对 Tweets 的大数据和食物两个不相关的主题进行情感分析。

有几种 API 可以在 Tweets 上做情感分析,但是作者选择斯坦福自然语言处理组开发的库来抽取相关情感。
build.sbt文件中增加相对应的依赖。

复制代码
libraryDependencies += "edu.stanford.nlp" % "stanford-corenlp" % "3.5.1"
libraryDependencies += "edu.stanford.nlp" % "stanford-corenlp" % "3.5.1" classifier "models"

现在,我们通过 Streaming 过滤一定的哈希标签,只选择感兴趣的 Tweets,如下所示:

复制代码
val tweets = stream.filter {t =>
val tags = t.getText.split(" ").filter(_.startsWith("#")).map(_.toLowerCase)
tags.contains("#bigdata") && tags.contains("#food")
}

得到 Tweets 上所有标签,然后标记出#bigdata 和 #food 两个标签。
接下来定一个函数从 Tweets 抽取相关的情感:

def detectSentiment(message: String): SENTIMENT_TYPE然后对 detectSentiment 进行测试以确保其可以工作:

复制代码
it("should detect not understood sentiment") {
detectSentiment("") should equal (NOT_UNDERSTOOD)
}
it("should detect a negative sentiment") {
detectSentiment("I am feeling very sad and frustrated.") should equal (NEGATIVE)
}
it("should detect a neutral sentiment") {
detectSentiment("I'm watching a movie") should equal (NEUTRAL)
}
it("should detect a positive sentiment") {
detectSentiment("It was a nice experience.") should equal (POSITIVE)
}
it("should detect a very positive sentiment") {
detectSentiment("It was a very nice experience.") should equal (VERY_POSITIVE)
}

完整列子如下:

复制代码
val data = tweets.map { status =>
val sentiment = SentimentAnalysisUtils.detectSentiment(status.getText)
val tags = status.getHashtagEntities.map(_.getText.toLowerCase)
(status.getText, sentiment.toString, tags)
}

data 中包含相关的情感。

和 SQL 协同进行分析

现在作者想把情感分析的数据存储在外部数据库,为了后续可以使用 SQL 查询。
具体操作如下:

复制代码
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
data.foreachRDD { rdd =>
rdd.toDF().registerTempTable("sentiments")
}

将 Dstream 转换成 DataFrame,然后注册成一个临时表,其他喜欢使用 SQL 的同事就可以使用不同的数据源啦。

sentiment 表可以被任意查询,也可以使用 Spark SQL 和其他数据源(比如,Cassandra 数据等)进行交叉查询。
查询 DataFrame 的列子:

sqlContext.sql("select * from sentiments").show()## 窗口操作

Spark Streaming 的窗口操作可以进行回溯数据,这在其他流式引擎中并没有。
为了使用窗口函数,你需要 checkpoint 流数据,具体详情见 http://spark.apache.org/docs/latest/streaming-programming-guide.html#checkpointing
简单的一个窗口操作:

复制代码
tags
.window(Minutes(1))
. (...)

结论

此列子虽然简单,但是其可以使用 Spark 解决实际问题。我们可以计算 Twitter 上主题趋势。

公众号推荐:

跳进 AI 的奇妙世界,一起探索未来工作的新风貌!想要深入了解 AI 如何成为产业创新的新引擎?好奇哪些城市正成为 AI 人才的新磁场?《中国生成式 AI 开发者洞察 2024》由 InfoQ 研究中心精心打造,为你深度解锁生成式 AI 领域的最新开发者动态。无论你是资深研发者,还是对生成式 AI 充满好奇的新手,这份报告都是你不可错过的知识宝典。欢迎大家扫码关注「AI前线」公众号,回复「开发者洞察」领取。

2016-05-24 17:454786
用户头像

发布了 43 篇内容, 共 27.7 次阅读, 收获喜欢 7 次。

关注

评论

发布
暂无评论
发现更多内容

谈谈PhxSQL的设计和实现哲学(上)

OpenIM

Pandas教程-4-DataFrame数据筛选(中)

Peter

Python 数据分析 pandas

mycat入门:简介和安装

小鲍侃java

9月日更

☕【JVM技术指南】「难点-核心-遗漏」TLAB内存分配+锁的碰撞(技术串烧)!

洛神灬殇

JVM TLAB 锁升级 内存分配 9月日更

Tapdata肖贝贝:实时数据引擎系列(三) - 流处理引擎对比

tapdata

谈谈PhxSQL的设计和实现哲学(下)

OpenIM

拒不外传!阿里内部耗重金找人总结出这份并发编程手册(全彩版)

公众号_愿天堂没有BUG

Java 编程 程序员 架构 面试

源码 | 解析 Redo Log 实现方式

RadonDB

MySQL 数据库 RadonDB

Pandas教程-2-10种方式创建DataFrame

Peter

Python 机器学习 pandas

硬科技热度有增无减,现在入局能否搭上赛道快车?

创业邦

震撼!多名阿里资深专家联合撰写深入理解Redis设计源码手册

公众号_愿天堂没有BUG

Java 编程 程序员 架构 面试

Pandas教程-3-DataFrame数据筛选(上)

Peter

Python 数据分析 pandas

源码大放送:基于Pyecharts的苏州旅游攻略

Peter

Python 数据分析 爬虫

Application.mk

Changing Lin

9月日更

【GaussDB精品课第1期】GaussDB(for openGauss)数据库,打造自研世界级产品

华为云数据库小助手

GaussDB 课程 GaussDB(for openGauss) 华为云视频 华为云数据库

炸裂!阿里十年老兵总结出SpringCloud入门到实战手册

公众号_愿天堂没有BUG

Java 编程 程序员 架构 面试

合约量化策略系统搭建,合约策略交易软件开发

云行·数治·慧用丨边缘云一体机赋能基层实现边缘侧数据智能

浪潮云

云计算

五岳核心版上线!这份阿里开发手册核心版又将被多少人疯狂转载?

公众号_愿天堂没有BUG

Java 编程 程序员 架构 面试

我靠!都金三银四了还有人没看过阿里这份Java面试核心手册?

公众号_愿天堂没有BUG

Java 编程 程序员 架构 面试

configparser 配置文件解析器

林十二XII

Pandas教程-1-Series类型数据

Peter

Python 数据分析 pandas

极狐GitLab 和 ArgoCD 的集成实践

极狐GitLab

Kubernetes gitlab 极狐GitLab ArgoCD

二叉树层次遍历及应用

高性能架构探索

面试 二叉树 遍历

遭GitHub封杀!百万人竟跪求这份阿里内部Java面试手册

公众号_愿天堂没有BUG

Java 编程 程序员 架构 面试

元宇宙基础设施提供商【时空云】将承办【博鳌亚洲论坛区块链分论坛】

时空云

分布式存储 IPFS Filecoin 元宇宙

第一波场DAPP系统搭建|DAPP介绍

Geek_23f0c3

DAPP智能合约交易系统开发 波场链DAPP开发 第一波场

史上最强!这份在各大平台获百万推荐的Java核心手册实至名归

公众号_愿天堂没有BUG

Java 编程 程序员 架构 面试

上线几小时下载量破百万!无价的这份阿里并发编程图册就这么强势

公众号_愿天堂没有BUG

Java 编程 程序员 架构 面试

终于有阿里P8从开发、运维两个角度总结出了Redis实战手册

公众号_愿天堂没有BUG

Java 编程 程序员 架构 面试

支持HDMI-IN接口的安卓工控主板有哪些?

双赞工控

使用Spark Streaming进行情感分析_语言 & 开发_侠天_InfoQ精选文章