写点什么

使用 Spark Streaming 进行情感分析

  • 2016-05-24
  • 本文字数:2054 字

    阅读完需:约 7 分钟

这里将使用 Twitter 流式数据,它符合所有所需:持续而且无止境的数据源。

Spark Streaming

Spark Streaming 在电子书《手把手教你学习Spark》第六章有详细介绍,这里略过Streaming API 的详细介绍,直接进行程序开发 。

程序开发设置部分

程序开发起始部分需要做好准备工作。

复制代码
val config = new SparkConf().setAppName("twitter-stream-sentiment")
val sc = new SparkContext(config)
sc.setLogLevel("WARN")
val ssc = new StreamingContext(sc, Seconds(5))
System.setProperty("twitter4j.oauth.consumerKey", "consumerKey")
System.setProperty("twitter4j.oauth.consumerSecret", "consumerSecret")
System.setProperty("twitter4j.oauth.accessToken", accessToken)
System.setProperty("twitter4j.oauth.accessTokenSecret", "accessTokenSecret")
val stream = TwitterUtils.createStream(ssc, None)

这里创建一个 Spark Context sc,设置日志级别为 WARN 来消除 Spark 生成的日志。使用sc创建 Streaming Contextssc,然后设置 Twitter 证书来获得 Twitter 网站数据。

Twitter 上现在的趋势是什么?

很容易的能够找到任意给定时刻的 Twitter 趋势,仅仅需要计算数据流每个标签的数目。让我们看下 Spark 如何实现这个操作的。

复制代码
val tags = stream.flatMap { status =>
status.getHashtagEntities.map(_.getText)
}
tags.countByValue()
.foreachRDD { rdd =>
val now = org.joda.time.DateTime.now()
rdd
.sortBy(_._2)
.map(x => (x, now))
.saveAsTextFile(s"~/twitter/$now")
}

首先从 Tweets 获取标记,并计算标记的数量,按数量排序,然后持久化结果。我们基于前面的结果建立一个监控面板来跟踪趋势标签。作者的同事就可以创建一个广告标记(campaigns),并吸引更多的用户。

分析 Tweets

现在我们想增加一个功能来获得用户主要感兴趣的主题集。为了这个目的我们想对 Tweets 的大数据和食物两个不相关的主题进行情感分析。

有几种 API 可以在 Tweets 上做情感分析,但是作者选择斯坦福自然语言处理组开发的库来抽取相关情感。
build.sbt文件中增加相对应的依赖。

复制代码
libraryDependencies += "edu.stanford.nlp" % "stanford-corenlp" % "3.5.1"
libraryDependencies += "edu.stanford.nlp" % "stanford-corenlp" % "3.5.1" classifier "models"

现在,我们通过 Streaming 过滤一定的哈希标签,只选择感兴趣的 Tweets,如下所示:

复制代码
val tweets = stream.filter {t =>
val tags = t.getText.split(" ").filter(_.startsWith("#")).map(_.toLowerCase)
tags.contains("#bigdata") && tags.contains("#food")
}

得到 Tweets 上所有标签,然后标记出#bigdata 和 #food 两个标签。
接下来定一个函数从 Tweets 抽取相关的情感:

def detectSentiment(message: String): SENTIMENT_TYPE然后对 detectSentiment 进行测试以确保其可以工作:

复制代码
it("should detect not understood sentiment") {
detectSentiment("") should equal (NOT_UNDERSTOOD)
}
it("should detect a negative sentiment") {
detectSentiment("I am feeling very sad and frustrated.") should equal (NEGATIVE)
}
it("should detect a neutral sentiment") {
detectSentiment("I'm watching a movie") should equal (NEUTRAL)
}
it("should detect a positive sentiment") {
detectSentiment("It was a nice experience.") should equal (POSITIVE)
}
it("should detect a very positive sentiment") {
detectSentiment("It was a very nice experience.") should equal (VERY_POSITIVE)
}

完整列子如下:

复制代码
val data = tweets.map { status =>
val sentiment = SentimentAnalysisUtils.detectSentiment(status.getText)
val tags = status.getHashtagEntities.map(_.getText.toLowerCase)
(status.getText, sentiment.toString, tags)
}

data 中包含相关的情感。

和 SQL 协同进行分析

现在作者想把情感分析的数据存储在外部数据库,为了后续可以使用 SQL 查询。
具体操作如下:

复制代码
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
data.foreachRDD { rdd =>
rdd.toDF().registerTempTable("sentiments")
}

将 Dstream 转换成 DataFrame,然后注册成一个临时表,其他喜欢使用 SQL 的同事就可以使用不同的数据源啦。

sentiment 表可以被任意查询,也可以使用 Spark SQL 和其他数据源(比如,Cassandra 数据等)进行交叉查询。
查询 DataFrame 的列子:

sqlContext.sql("select * from sentiments").show()## 窗口操作

Spark Streaming 的窗口操作可以进行回溯数据,这在其他流式引擎中并没有。
为了使用窗口函数,你需要 checkpoint 流数据,具体详情见 http://spark.apache.org/docs/latest/streaming-programming-guide.html#checkpointing
简单的一个窗口操作:

复制代码
tags
.window(Minutes(1))
. (...)

结论

此列子虽然简单,但是其可以使用 Spark 解决实际问题。我们可以计算 Twitter 上主题趋势。

2016-05-24 17:455418
用户头像

发布了 43 篇内容, 共 30.6 次阅读, 收获喜欢 7 次。

关注

评论

发布
暂无评论
发现更多内容

2022上半年PMP考试通过率得多低,才能换来一次免费补考机会

索隆

将使用回调函数作为参数的函数改造为返回 Promise 的一个具体例子

汪子熙

JavaScript web开发 Promise 异步编程 8月月更

本周四晚19:00知识赋能第七期第2课丨OpenHarmony WiFi扫描仪UX设计

OpenHarmony开发者

Open Harmony

字节跳动嵌入式数据分析最佳实践

字节跳动数据平台

字节跳动 数据分析 BI 嵌入式分析 数据看板

个推TechDay直播预告 | 8月24日晚19:30,实时数仓搭建保姆级教程开课

个推

数据仓库 实时数仓 Flink 平台

GitHub破百万访问的阿里神作:并发实现原理JDK源码笔记

冉然学Java

Java 编程 jdk 源码刨析 JDK 1.5

浅谈云上攻防系列——云IAM原理&风险以及最佳实践

腾讯安全云鼎实验室

安全攻防 云安全 安全研究

罗技产品究竟能不能带来便捷感

Amazing_eve

#开源

openEuler代码贡献之星:麒麟软件裴建康

openEuler

开发者 成长 代码规范 openEuler 开源社区

如何编写有效的常见问题解答页面?

Geek_da0866

渲染与云渲染——渲染行业的新趋势

Finovy Cloud

云渲染 GPU算力

有关Java性能优化,这是我见过阿里大佬总结的最全的一份实战文档了

程序员小毕

Java 程序员 面试 程序人生 性能优化

从函数计算到 Serverless 架构

阿里巴巴中间件

阿里云 开源 Serverless 云原生

开源贡献者计划 2022 第二期正式启动!争做战“码”先锋!

InfoQ写作社区官方

开源 OpenHarmony 热门活动

大专的我狂刷29天“阿里内部面试笔记”最终直接斩获十七个Offer

收到请回复

Java 程序员 阿里 面试八股文 Java面试八股文

短视频源码APP开发——短视频的功能

开源直播系统源码

软件开发 直播系统源码 短视频直播系统

避免 10 大 NGINX 配置错误(下)

NGINX开源社区

nginx 配置 配置分析 故障排除

「望繁信科技」完成过亿元A+轮融资,全面加速流程智能产品建设

望繁信科技

如何开发一款基于 vite+vue3 的在线表格系统(下)

葡萄城技术团队

Vue 前端 vite

Java架构岗9大性能优化经验总结,我不允许你不会

程序员小毕

Java 数据库 程序员 面试 程序人生

从程序员到架构师,阿里巴巴2022全新出品Java程序员“成长笔记”满足了我的所有幻想

Java全栈架构师

Java 程序员 面试 后端 架构师

程序员面试太卷?我选择背这份阿里最新Java面试八股文(详解版)

Java面试那些事儿

Java 面试 Java 面试 java程序员 java 编程

多线程+JVM+设计模式+数据库,阿里巴巴Java性能优化全解实战笔记真香

Java永远的神

Java 数据库 程序员 面试 多线程

【Java】:数组的创建、赋值、访问以及长度

翼同学

Java 学习 编程语言 分享 8月月更

泄露了,Alibaba697页的MySQL应用实战与性能调优手册,太强了

冉然学Java

Java MySQL 编程 性能优化 构架

技术团队管理者的三十六计

申屠鹏会

团队管理

合作再升级!云原生加速器成员企业云霁科技获得阿里云产品生态集成认证

阿里巴巴中间件

阿里云 云原生 合作 阿里云云原生加速器

EMAS Serverless到底有多便利?

hum建应用专家

云计算 Serverless emas

OpenYurt 邀你共赴 2022 EdgeX 中国挑战赛!

阿里巴巴中间件

阿里云 云原生 openyurt 边缘容器

架构设计文档模板

maybe

使用Spark Streaming进行情感分析_语言 & 开发_侠天_InfoQ精选文章