Foro Formación Hadoop

Imagen de Admin Formación Hadoop
Análisis de sentimiento con Apache Spark MLlib
de Admin Formación Hadoop - martes, 28 de marzo de 2017, 16:18

Twitter Sentiment Analysis with Spark MLlib

  • Apache Spark MLlib's implementation of Naive Bayes classifier is used for classifying the tweets in real-time.
  • Training is performed using 1.6 million tweet training data made available by Sentiment140.
  • Model created by Naive Bayes is applied in real-time to the tweets retrieved using Twitter Streaming API to determine the sentiment of each of the tweets.
  • We also compare this result with Stanford CoreNLP sentiment prediction.
  • Tweets are classified by both these approaches as:
    • Positive
    • Neutral
    • Negative
  • Please note all non-English tweets are classified as "neutral" as our training data consists of English language only tweets.
  • We analyze and process and consider only the tweets which have location and discard tweets without location info.
    • This is to facilitate the visualization based on the latitude, longitude info of the tweets.
  • Application can also save compressed raw tweets to the disk.
    • Please set SAVE_RAW_TWEETS flag to true in application.conf if you want to save / retain the raw tweets we retrieve from Twitter.
  • The result of the tweet is published to Redis which is subscribed by the front-end webapp for visualization.
  • Datamaps -- based on D3.js -- is used for visualization to display the tweet location on the world map with a pop up for more details on hover.
    • Hover over the bubbles to see the additional info of the tweets.
    • Visualization is fully responsive and scales well for any form factor. Works even on mobile.
    • App adjusts if a window is resized without impacting the UX or losing the data already on the screen.
    • Changes to the orientation [of a phone / tablet] does not have any impact on the app either.

Code walkthru

Note: For brevity, the language chosen for this project is Scala. Also, Spark is written in Scala and has first class support. So, its preferable to write Spark jobs in Scala unless there is a need for scipy, numpy or some very useful R packages. Writing Spark jobs in Java might be very painful. Instead a bit of effort in learning Scala will give you a longer rope to walk than being stuck with Java.

  1. Configuration
  2. Machine learning
    • Create the model using Naive Bayes classifier
    • Validate the accuracy of the model
  3. Setting up Twitter App OAuth credentials
  4. Spark Streaming job for sentiment analysis of tweets
  5. Visualization of tweets


Please check "Spark-MLlib-Twitter-Sentiment-Analysis" README for the detailed instructions on running the Docker Image and steps to trigger Spark jobs, etc.


There is a properties file: application.conf in this project holding all the configurable properties required for the execution of this project.

Twitter App OAuth credentials

  • The only manual intervention required in this project is setting up a Twitter App and updating OAuth credentials to connect to Twitter Streaming API. Please note that this is a critical step and without this, Spark will not be able to connect to Twitter or retrieve tweets with Twitter Streaming API and so, the visualization will be empty basically without any data.
  • Please check the application.conf and add your own values and complete the integration of Twitter API to your application by looking at your values from Twitter Developer Page.
    • If you did not create a Twitter App before, then please create a new Twitter App on Twitter Developer Page, where you will get all the required values of application.conf.

Create the ML model

For this section, please check the source code of SparkNaiveBayesModelCreator.scala and MLlibSentimentAnalyzer.scala of this project on GitHub.

  • The first step for this project is creating the model using Naive Bayes classifier. For this, we pass our training dataset downloaded from Sentiment140 website through Apache Spark MLlib's implementation of Naive Bayes classifier.
  • To do anything in Spark, we first have to create SparkContext which is the main entry point for Spark functionality. A SparkContext represents the connection to a Spark cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster.
  def createSparkContext(): SparkContext = {
    val conf = new SparkConf()
      .set("spark.serializer", classOf[KryoSerializer].getCanonicalName)
    val sc = SparkContext.getOrCreate(conf)
  • We also load stopwords from a file in the classpath and broadcast it, which will be picked up by the Spark Executors. This is required to remove noise from the signal as the frequently occurring words might skew the sentiment.
  • Sentiment140 is a project of Stanford students who created a training data set of 1.6 million tweets with columns for "polarity", "id", "date", "query", "user", "status".
  • The next step is to load the sentiment140 file and retain only the columns we are interested in [i.e.polarity and tweet text (status)] and discard rest of the columns.
  def loadSentiment140File(sc: SparkContext, sentiment140FilePath: String): DataFrame = {
    val sqlContext = SQLContextSingleton.getInstance(sc)
    val tweetsDF =
      .option("header", "false")
      .option("inferSchema", "true")
      .toDF("polarity", "id", "date", "query", "user", "status")

    // Drop the columns we are not interested in.
  • Naive Bayes works on the concept of LabeledPoint, which is a Class that represents the features and labels of a data point.
    • It takes an RDD of LabeledPoint and an optional smoothing parameter lambda as input, an optional model type parameter (default is “multinomial”), and outputs a NaiveBayesModel, which can be used for evaluation and prediction.
    • In our case The LabeledPoint has the polarity as the label of the data point for the corresponding tweet text as the features / Vectors created by transforming the tweet text using HashingTF class.
  def createAndSaveNBModel(sc: SparkContext, stopWordsList: Broadcast[List[String]]): Unit = {
    val tweetsDF: DataFrame = loadSentiment140File(sc, PropertiesLoader.sentiment140TrainingFilePath)

    val labeledRDD ="polarity", "status") {
      case Row(polarity: Int, tweet: String) =>
        val tweetInWords: Seq[String] = MLlibSentimentAnalyzer.getBarebonesTweetText(tweet, stopWordsList.value)
        LabeledPoint(polarity, MLlibSentimentAnalyzer.transformFeatures(tweetInWords))

    val naiveBayesModel: NaiveBayesModel = NaiveBayes.train(labeledRDD, lambda = 1.0, modelType = "multinomial"), PropertiesLoader.naiveBayesModelPath)
  • After Spark completes training, it will save the model to the disk in the location as defined in the preconfigured application.conf properties file.

Validate the accuracy of the model

  • We will quickly validate the accuracy of the model by running our model again a test dataset provided by Sentiment140 website. This properties file has already been configured to pick the location of the file. This file too has the exact same set of columns as we saw earlier in training phase. So, we retain only the columns we are interested in.
  def validateAccuracyOfNBModel(sc: SparkContext, stopWordsList: Broadcast[List[String]]): Unit = {
    val naiveBayesModel: NaiveBayesModel = NaiveBayesModel.load(sc, PropertiesLoader.naiveBayesModelPath)

    val tweetsDF: DataFrame = loadSentiment140File(sc, PropertiesLoader.sentiment140TestingFilePath)
    val actualVsPredictionRDD ="polarity", "status") {
      case Row(polarity: Int, tweet: String) =>
        val tweetText = replaceNewLines(tweet)
        val tweetInWords: Seq[String] = MLlibSentimentAnalyzer.getBarebonesTweetText(tweetText, stopWordsList.value)
    val accuracy = 100.0 * actualVsPredictionRDD.filter(x => x._1 == x._2).count() / tweetsDF.count()
    println(f"""\n\t<==******** Prediction accuracy compared to actual: $accuracy%.2f%% ********==>\n""")
    saveAccuracy(sc, actualVsPredictionRDD)
  • As we already have the polarity of each tweet in this dataset, we can verify that value against the prediction our model computes with the tweet text passed to it. In my testing, I could get a high accuracy of 79.3% and low of 58.5%. There is a definite need to get this prediction accuracy a much higher rate than what it is currently hovering around.

Spark Streaming job for sentiment analysis of tweets

Please refer to TweetSentimentAnalyzer.scalaMLlibSentimentAnalyzer.scala and CoreNLPSentimentAnalyzer.scala for the code for this section.

  • We utilise the Twitter App OAuth credentials and connect to Twitter with Spark Streaming and retrieve tweets in real-time from Twitter.
  • Naive Bayes Model saved in the previous step is loaded into memory and is being passed the tweets we receive from Twitter. The model predicts the sentiment of the tweet immediately as either Positive, Negative or Neutral based on the tweet text.
  • We also find the sentiment as detected by Stanford CoreNLP.
  • And finally we publish the following info to a Redis Channel.
    1. tweet handle
    2. tweet profile pic
    3. date tweet created
    4. text of the tweet
    5. sentiment predicted by MLlib
    6. sentiment as per Stanford CoreNLP
    7. Latitude and Longitude
  • After predicting the sentiment, we save the classified tweets to a text file and also publish the same to a Redis channel.
  • Also as explained earlier, the raw tweets [in JSON format] can also be saved.
    • Please note all the data saved to disks is done in compressed format to save the disk space.
  • Though spark-redis package exists, as of this writing, it does not yet pubsub from Spark Streaming. So, we resorted to Jedis as it is really easy to use and has a small footprint.


Git con la información y con el código del proyecto en:

Otro ejemplo: