
In this Apache Spark Machine Learning example, Spark MLlib is introduced and Scala source code analyzed. This post and accompanying screencast videos demonstrate a custom Spark MLlib Spark driver application. Then, the Spark MLLib Scala source code is examined. Many topics are shown and explained, but first, let’s describe a few machine learning concepts.
Machine Learning Key Concepts
What is machine learning?
Machine learning is creating and using models that are learned from data. You might also hear machine learning referred to as predictive modeling or data mining.
What are three examples of machine learning?
- spam prediction
- fraudulent credit card transaction prediction
- a product or advertisement recommendation engine
There are two types of machine learning models: supervised and unsupervised. Supervised models contain a set of data labeled with correct answers while unsupervised does not contain labeling.
Examples of Supervised machine learning models
- k-nearest neighbors: predict how a person might vote if you know how their neighbors are voting
- naive bayes: determine if an incoming email is spam
- linear regression: try to determine if two variables are correlated
- decision trees: use a structure to represent a number of possible decision paths and an outcome for each path
Examples of Unsupervised machine learning models
- clustering – works with unlabeled data and attempts to “cluster” it. For example, a data set showing where millionaires live has clusters in places like Beverly Hills and Manhattan
- Latent Dirichlet Analysis (LDA) – natural language processing commonly used to identify common topics in text or a set of documents
- neural networks: handwriting recognition and face image detection
When building models used to make predictions, we often train a model based on an existing data set. The model may be re-trained as more and more training data set becomes available. For example, we would re-train a recommendation engine based on collaborative filtering as we learned more about the events which led to product sales or targeted engagement metrics.
Apache Spark Machine Learning Example
Let’s show a demo of an Apache Spark machine learning program. In the following demo, we begin by training the k-means clustering model and then use this trained model to predict the language of an incoming text stream from Slack.
This example is built upon a previous Apache Spark Streaming tutorial which streams data from a Slack team site. See Resources section below for links.
But, let’s move forward with the demo:
Spark Machine Learning Scala Source Code Review
Now that we have the demo in mind, let’s review the Spark MLLib relevant code. Again, the links to source code may be found in the Resources section below. Let’s start with the entry into our Spark Machine Learning example and what was called during spark-submit deploys in the demo, SlackMLApp:
object SlackMLApp {
object Config {
@Parameter(names = Array("-st", "--slackToken"))
var slackToken: String = null
@Parameter(names = Array("-nc", "--numClusters"))
var numClusters: Int = 4
@Parameter(names = Array("-po", "--predictOutput"))
var predictOutput: String = null
@Parameter(names = Array("-td", "--trainData"))
var trainData: String = null
@Parameter(names = Array("-ml", "--modelLocation"))
var modelLocation: String = null
}
def main(args: Array[String]) {
new JCommander(Config, args.toArray: _*)
val conf = new SparkConf().setAppName("SlackStreamingWithML")
val sparkContext = new SparkContext(conf)
// optain existing or create new model
val clusters: KMeansModel =
if (Config.trainData != null) {
KMeanTrainTask.train(sparkContext, Config.trainData, Config.numClusters, Config.modelLocation)
} else {
if (Config.modelLocation != null) {
new KMeansModel(sparkContext.objectFile[Vector](Config.modelLocation).collect())
} else {
throw new IllegalArgumentException("Either modelLocation or trainData should be specified")
}
}
if (Config.slackToken != null) {
SlackStreamingTask.run(sparkContext, Config.slackToken, clusters, Config.predictOutput)
}
}
}
The code above contains the main
method and is called from spark-submit
. As you can see, we will either train a new model or use an existing model when running the SlackStreamingTask. It depends on the incoming command line arguments such as trainData
, modelLocation
and slackToken
.
In this Spark machine learning example source code analysis, next, we focus on 1) the code used to train the model in KMeanTrainTask and 2) using the model to make predictions in SlackStreamingTask.
First, let’s open the relevant portion KMeanTrainTask
def train(sparkContext: SparkContext, trainData: String, numClusters: Int, modelLocation: String): KMeansModel = {
if (new File(modelLocation).exists) removePrevious(modelLocation)
val trainRdd = sparkContext.textFile(trainData)
val parsedData = trainRdd.map(Utils.featurize).cache()
// if we had a really large data set to train on, we'd want to call an action to trigger cache.
val model = KMeans.train(parsedData, numClusters, numIterations)
sparkContext.makeRDD(model.clusterCenters, numClusters).saveAsObjectFile(modelLocation)
val example = trainRdd.sample(withReplacement = false, 0.1).map(s => (s, model.predict(Utils.featurize(s)))).collect()
println("Prediction examples:")
example.foreach(println)
model
}
When calling train
we attempt to remove any previously saved model in removePrevious
. (removePrevious
isn’t shown because it’s not relevant for our focus on machine learning with Apache Spark.) So, let’s set up a new RDD called trainRdd
. Since textFile
accepts a String argument of a directory, it will read all files contained in the directory which we called with “input”.
Next, we must convert the elements (rows of text) in the RDD to a format suitable for KMeans. We do this by calling Utils.featurize
which looks like this:
object Utils {
val NUM_DEMENSIONS: Int = 1000
val tf = new HashingTF(NUM_DEMENSIONS)
/**
* This uses min hash algorithm https://en.wikipedia.org/wiki/MinHash to transform
* string to vector of double, which is required for k-means
*/
def featurize(s: String): Vector = {
tf.transform(s.sliding(2).toSeq)
}
}
Now, if we go back to our KMeansTrain task object, we’re in a position to train our model using KMeans.train
function with parsedData
and numClusters
and numIterations
. Afterward, we save the model and send a few example predictions of clustering to the console by iterating over example
and sending to println
.
Now that we have a model trained, let’s see SlackStreamingTask
object SlackStreamingTask {
def run(sparkContext: SparkContext, slackToken: String, clusters: KMeansModel, predictOutput: String) {
val ssc = new StreamingContext(sparkContext, Seconds(5))
val dStream = ssc.receiverStream(new SlackReceiver(slackToken))
val stream = dStream //create stream of events from the Slack... but filter and marshall to JSON stream data
.filter(JSON.parseFull(_).get.asInstanceOf[Map[String, String]]("type") == "message") // get only message events
.map(JSON.parseFull(_).get.asInstanceOf[Map[String, String]]("text")) // extract message text from the event
val kmeanStream = kMean(stream, clusters) // create K-mean model
kmeanStream.print() // print k-mean results. It is pairs (k, m), where k - is a message text, m - is a cluster number to which message relates
if (predictOutput != null) {
kmeanStream.saveAsTextFiles(predictOutput) // save to results to the file, if file name specified
}
ssc.start() // run spark streaming application
ssc.awaitTermination() // wait the end of the application
}
/**
* transform stream of strings to stream of (string, vector) pairs and set this stream as input data for prediction
*/
def kMean(dStream: DStream[String], clusters: KMeansModel): DStream[(String, Int)] = {
dStream.map(s => (s, Utils.featurize(s))).map(p => (p._1, clusters.predict(p._2)))
}
}
The Spark MLlib code which is making clustering predictions with a previously saved model is clusters.predict
. Before it is called, we map over the DStream and use featurize
again in order to use with predict
. We are returning a DStream with the original text received from Slack and the predicted cluster.
If the Spark driver program is called with the predictOutput
input, the output is saved as text files.
Here’s another screencast which I’m describing the code in more detail.
Resources
Source code: https://github.com/tmcgrath/spark-course/tree/master/spark-ml
Background on Machine Learning: http://www.slideshare.net/ToddMcGrath1/machine-learning-with-apache-spark-62310284
Spark MLlib: http://www.slideshare.net/ToddMcGrath1/machine-learning-with-spark-mllib
Spark ML and Spark MLLib documentation
Featured image credit: https://flic.kr/p/ainDUT
Hi todd, Thanks so much for such a great tutorial. I followed your previous one too and then this machine learning. I compiled sbt and the jar file has been created successfully. When I ran from single stand-alone mode, I got a message saying “16/10/12 12:48:17 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources” and this message keep on repeating again and again. So I tried to run the same from server which has 7 slaves. But then again I am getting the same message. Kindly tell me how to resolve this issue. Thanks so much in advance.
Nikitha,
Did you get this resolved? Can you post a screenshot of Spark UI to verify workers are running and available?
Todd
Hi thanks for the tutorial. when i tried to run the jar file for training, I ma keep getting the message again and again and again:
16/10/13 03:37:23 WARN scheduler.TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
Can u plz help me ? thanks alot
Have you been able to run this successfully?
If yes , then please take out time to answer me
I am struggling to find out what is happening here. I have trained the model first and saw some file is being written into model directory, then I ran the SlackMLApp, but not sure what is output here.
I am running above example in HDP(2.5) cluster where Spark 1.6.2 is running.
I am struggling to find out what is happening here. I have trained the model first and saw some file is being written into model directory, then I ran the SlackMLApp, but not sure what is output here.
Do I need to pass some text to let code give some prediction?
Or is it running the sample input again?
Also, While running I am facing below issue:
17/04/19 08:06:11 INFO ReceiverSupervisorImpl: Called receiver onStart
17/04/19 08:06:11 INFO ReceiverSupervisorImpl: Waiting for receiver to be stopped
Exception in thread “Thread-56” java.lang.NoSuchMethodError: scala.Predef$.ArrowAssoc(Ljava/lang/Object;)Ljava/lang/Object;
at scalaj.http.HttpConstants$.liftedTree1$1(Http.scala:637)
at scalaj.http.HttpConstants$.(Http.scala:636)
at scalaj.http.HttpConstants$.(Http.scala)
at scalaj.http.BaseHttp$.$lessinit$greater$default$2(Http.scala:754)
at scalaj.http.Http$.(Http.scala:738)
at scalaj.http.Http$.(Http.scala)
at com.supergloo.SlackReceiver.webSocketUrl(SlackReceiver.scala:47)
at com.supergloo.SlackReceiver.receive(SlackReceiver.scala:38)
at com.supergloo.SlackReceiver.run(SlackReceiver.scala:34)
at java.lang.Thread.run(Thread.java:745)
17/04/19 08:06:15 INFO JobScheduler: Added jobs for time 1492603575000 ms
17/04/19 08:06:15 INFO JobScheduler: Starting job streaming job 1492603575000 ms.0 from job set of time 1492603575000 ms
—————————-
But this error is not stopping the SlackMLApp to run. So it can be ignored, right?
Please help me here, as this is first time I am doing all this
I think you should also publish the list of software version used in running this code.
like
Scala : 2.11+
Java: 1.6+
Spark 1.6.2+
Something like this
Hello Todd
I just want to know how did you run this scala code and I’m trying to run in eclipse, but I’m getting errors like the libraries are not getting imported. What are the requirements to run this code?.