我是靠谱客的博主 从容冬日,最近开发中收集的这篇文章主要介绍Spark流和Twitter情感分析,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

这篇博客文章是我努力向同事展示如何通过使用Apache Spark的流功能和简洁的API获得所需见解的结果。 在此博客文章中,您将学习如何进行一些简单但非常有趣的分析,这些分析将通过分析社交网络的特定区域来帮助您解决实际问题。

在本演示中,使用Twitter流的子集是完美的选择,因为它具有我们所需的一切:无穷无尽的连续数据源,可供探索。

火花流最小化

在此处以及电子书“ Apache Spark入门”的第6章中对Spark Streaming进行了很好的解释,因此,我们将跳过有关Streaming API的一些详细信息,然后继续设置我们的应用程序。

设置我们的应用

让我们先看看如何准备我们的应用程序,然后再进行其他操作。

val config = new SparkConf().setAppName("twitter-stream-sentiment")
val sc = new SparkContext(config)
sc.setLogLevel("WARN")

val ssc = new StreamingContext(sc, Seconds(5))

System.setProperty("twitter4j.oauth.consumerKey", "consumerKey")
System.setProperty("twitter4j.oauth.consumerSecret", "consumerSecret")
System.setProperty("twitter4j.oauth.accessToken", accessToken)
System.setProperty("twitter4j.oauth.accessTokenSecret", "accessTokenSecret")

val stream = TwitterUtils.createStream(ssc, None)

在这里,我们创建了Spark Context sc ,并将日志级别设置为WARN,以消除Spark生成的嘈杂日志。 我们还使用sc创建了一个Streaming Context ssc 。 然后,我们设置从Twitter网站获得的Twitter凭据(在执行此操作之前,我们需要执行以下步骤 )。 现在,真正的乐趣开始了

Twitter上的趋势现在是什么?

在任何给定时刻,很容易找出Twitter上的趋势; 只需计算流中每个标签的外观即可。 让我们看看Spark如何允许我们执行此操作。

val tags = stream.flatMap { status =>
   status.getHashtagEntities.map(_.getText)
}
tags.countByValue()
   .foreachRDD { rdd =>
       val now = org.joda.time.DateTime.now()
       rdd
         .sortBy(_._2)
         .map(x => (x, now))
         .saveAsTextFile(s"~/twitter/$now")
     }

首先,我们从推文中获取标签,计算它(一个标签)出现了多少次,并按计数对它们进行了排序。 之后,我们将结果持久化,以便将Splunk(或与此相关的任何其他工具)指向该结果。 我们可以使用此信息构建一些有趣的仪表板,以便跟踪最热门的主题标签。 根据这些信息,我的同事可以创建广告系列并使用这些受欢迎的标签吸引更多的受众。

分析推文

现在,我们想添加功能以获得人们对一组主题的总体看法。 就本例而言,假设我们想了解有关大数据食品的推文的情感 ,这是两个非常不相关的主题。

有几种用于分析Tweets中的情感的API,但是我们将使用Stanford Natural Language Processing Group的一个有趣的库来提取相应的情感

在我们的build.sbt文件中,我们需要添加相应的依赖项。

libraryDependencies += "edu.stanford.nlp" % "stanford-corenlp" % "3.5.1" libraryDependencies += "edu.stanford.nlp" % "stanford-corenlp" % "3.5.1" classifier "models"

现在,我们只需要选择某些真正关心的Tweet,即可使用某些井号(#)过滤 。 借助统一的Spark API,此过滤非常容易。

让我们看看如何。

val tweets = stream.filter {t =>
     val tags = t.getText.split(" ").filter(_.startsWith("#")).map(_.toLowerCase)
     tags.contains("#bigdata") && tags.contains("#food")
   }

在这里,我们获取每个Tweet中的所有标签,并检查是否已使用#bigdata#food对其进行了标签。

一旦我们有了推文,提取相应的情绪就很容易了。 我们定义一个从Tweet的内容中提取情感的函数,以便将其插入到管道中。

def detectSentiment(message: String): SENTIMENT_TYPE

我们将使用此函数,并假设它会执行应做的事情,并且将其实现放在最后,因为它不是本文的重点。 为了了解其工作原理,让我们围绕它进行一些测试。

it("should detect not understood sentiment") {
     detectSentiment("") should equal (NOT_UNDERSTOOD)
}

it("should detect a negative sentiment") {
     detectSentiment("I am feeling very sad and frustrated.") should equal (NEGATIVE)
}

it("should detect a neutral sentiment") {
     detectSentiment("I'm watching a movie") should equal (NEUTRAL)
}

it("should detect a positive sentiment") {
     detectSentiment("It was a nice experience.") should equal (POSITIVE)
}

it("should detect a very positive sentiment") {
     detectSentiment("It was a very nice experience.") should equal (VERY_POSITIVE)
}

这些测试应该足以显示detectSentiment的工作方式。

让我们来看一个例子。

val data = tweets.map { status =>
   val sentiment = SentimentAnalysisUtils.detectSentiment(status.getText)
   val tags = status.getHashtagEntities.map(_.getText.toLowerCase)

   (status.getText, sentiment.toString, tags)
}

数据表示我们想要的Tweets的DStream ,相关的情感以及Tweet中的主题标签(在这里我们应该找到用于过滤的标签)。

SQL互操作性

现在,我们想将情感数据与我们可以使用SQL查询的外部数据集进行交叉引用。 对于我的同事来说,能够将Twitter流与他的其他数据集一起加入是非常有意义的。

让我们看看如何实现这一目标。

val sqlContext = new SQLContext(sc)

import sqlContext.implicits._

data.foreachRDD { rdd =>
   rdd.toDF().registerTempTable("sentiments")
}

我们已经将流转换为不同的表示形式( DataFrame ),它也受所有Spark概念(弹性,分布式,非常快)的支持,并将其公开为表,以便我的同事可以使用他钟爱的SQL查询不同的源。

与我们系统中的任何其他表一样,将查询表情感 (我们是从DataFrame中定义的)。 另一种可能性是,我们可以使用Spark SQL查询其他数据源(Cassandra,Xmls或我们自己的二进制格式的文件),并将它们与流交叉。

您可以在此处此处找到有关此主题的更多信息

接下来显示查询DataFrame的示例。

sqlContext.sql("select * from sentiments").show()

窗口操作

Spark Streaming具有回溯流的能力,这是大多数流引擎缺少的功能(如果确实具有此功能,则很难实现)。

为了实现窗口化操作,您需要检查流,但这是一个简单的任务。 您可以在此处找到有关此内容的更多信息。

这是这种操作的一个小例子:

tags    .window(Minutes(1))    . (...)

结论

即使我们的示例非常简单,我们仍然能够使用Spark解决实际问题。 现在,我们可以在Twitter上识别热门话题,这有助于我们定位并增加受众。 同时,我们能够使用一组工具(例如SQL)访问不同的数据集。

来自#bigdata#food的结果非常有趣。 也许人们在午餐时间发布有关大数据的推文,谁知道呢?

翻译自: https://www.javacodegeeks.com/2016/04/spark-streaming-twitter-sentiment-analysis.html

最后

以上就是从容冬日为你收集整理的Spark流和Twitter情感分析的全部内容,希望文章能够帮你解决Spark流和Twitter情感分析所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(41)

评论列表共有 0 条评论

立即
投稿
返回
顶部