By using the sample from “SMS Spam Collection v. 1“, I write a simple program on Spark to classify normal and spam message.

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.feature.HashingTF
import org.apache.spark.mllib.regression.LinearRegressionWithSGD

object SimpleRegression {

  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("Simple Regression")
    val sc = new SparkContext(conf)

    val smsData = sc.textFile("hdfs://127.0.0.1/user/robin/SMSSpamCollection")
    val normal = smsData.filter(line => line.substring(0, 4) == "ham\t")
      .map(line => line.substring(4))
    val spam = smsData.filter(line => line.substring(0, 5) == "spam\t")
      .map(line => line.substring(5))

    // Create a HashingTF instance to map email text to vectors of 10,000 features.
    val tf = new HashingTF(numFeatures = 100000)
    // Each email is split into words, and each word is mapped to one feature.
    val spamFeatures = spam.map(email => tf.transform(email.split(" ")))
    val normalFeatures = normal.map(email => tf.transform(email.split(" ")))

    val positiveExamples = spamFeatures.map(features => LabeledPoint(100, features))
    val negativeExamples = normalFeatures.map(features => LabeledPoint(-100, features))
    val trainingData = positiveExamples.union(negativeExamples)

    trainingData.cache() // Cache since Logistic Regression is an iterative algorithm.
    // Run Linear Regression using the SGD algorithm.
    val model = new LinearRegressionWithSGD().run(trainingData)
    // Test on a positive example (spam) and a negative one (normal).
    val posTest = tf.transform(
      ("Someone has contacted our dating service and entered your phone because they fancy you").split(" "))
    val negTest = tf.transform(
      ("Hi Dady, I started studying Spark the other").split(" "))
    println("Prediction for positive test example: " + model.predict(posTest))
    println("Prediction for negative test example: " + model.predict(negTest))
  }
}

and the “build.sbt” file contains:

lazy val root = (project in file("."))
    .settings(
        name := "test",
        version := "1.0",
        scalaVersion := "2.10.6",
        unmanagedJars in Compile += file("/home/sanbai/hadoop-2.7.2/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.7.2.jar"),
        libraryDependencies ++= Seq(
            "org.apache.spark" % "spark-core_2.10" % "2.0.1",
            "org.apache.spark" % "spark-hive_2.10" % "2.0.1",
            "org.apache.spark" % "spark-mllib_2.10" % "2.0.1",
            "org.apache.spark" % "spark-streaming_2.10" % "2.0.1",
            "org.apache.hadoop" % "hadoop-client" % "2.7.2",
            "org.xerial.snappy" % "snappy-java" % "1.1.2"
        )
    )

After submit the job to YARN:

./bin/spark-submit --class SimpleRegression \
  --master yarn \
  --deploy-mode cluster \
  --driver-memory 2G \
  --executor-memory 14G \
  --executor-cores 1 \
  --num-executors 1 \
  --queue spark \
  /home/sanbai/myspark/target/scala-2.10/test_2.10-1.0.jar

We could retrieve the log of job by:

bin/yarn logs -applicationId application_1473140384986_0096

And the result is:

Prediction for positive test example: 24.238025869328453
Prediction for negative test example: -34.879236141966544

From now on, we can consider the message with negative value as normal and positive value as spam (Or use 10 instead of 0 as boundary).
This is just a example, for the dataset of sample is too small and it could only filter obvious spam message. To identify more spam messages, we need to add more features like ‘the topics of every message’, ‘total number of words’, ‘the frequency of special words’ etc.