- Use Spark API
- Use Sparkling Water API
- Get familiar with Spark shell
- Oracle Java 7+
- Spark 1.5.1
- Sparkling Water 1.5.6
- SMS dataset
Goal: For a given text message, identify if it is spam or not.
- Extract data
- Transform & tokenize messages
- Build Spark's Tf-IDF model and expand messages to feature vectors
- Create and evaluate H2O's Deep Learning model
- Use the models to detect spam messages
- Run Sparkling shell with an embedded Spark cluster:
cd "path/to/sparkling/water"
export SPARK_HOME="/path/to/spark/installation"
export MASTER="local-cluster[3,2,4096]"
bin/sparkling-shell --conf spark.executor.memory=2G
Note: To avoid flooding output with Spark INFO messages, I recommend editing your
$SPARK_HOME/conf/log4j.properties
and configuring the log level toWARN
.
-
Open Spark UI: Go to http://localhost:4040/ to see the Spark status.
-
Prepare the environment:
// Input data
val DATAFILE="../data/smsData.txt"
// Common imports from H2O and Sparks
import _root_.hex.deeplearning.{DeepLearningModel, DeepLearning}
import _root_.hex.deeplearning.DeepLearningParameters
import org.apache.spark.examples.h2o.DemoUtils._
import org.apache.spark.h2o._
import org.apache.spark.mllib
import org.apache.spark.mllib.feature.{IDFModel, IDF, HashingTF}
import org.apache.spark.rdd.RDD
import water.Key
-
Define the representation of the training message:
// Representation of a training message case class SMS(target: String, fv: mllib.linalg.Vector)
-
Define the data loader and parser:
def load(dataFile: String): RDD[Array[String]] = {
// Load file into memory, split on TABs and filter all empty lines
sc.textFile(dataFile).map(l => l.split("\t")).filter(r => !r(0).isEmpty)
}
- Define the input messages tokenizer:
// Tokenizer
// For each sentence in input RDD it provides array of string representing individual interesting words in the sentence
def tokenize(dataRDD: RDD[String]): RDD[Seq[String]] = {
// Ignore all useless words
val ignoredWords = Seq("the", "a", "", "in", "on", "at", "as", "not", "for")
// Ignore all useless characters
val ignoredChars = Seq(',', ':', ';', '/', '<', '>', '"', '.', '(', ')', '?', '-', '\'','!','0', '1')
// Invoke RDD API and transform input data
val textsRDD = dataRDD.map( r => {
// Get rid of all useless characters
var smsText = r.toLowerCase
for( c <- ignoredChars) {
smsText = smsText.replace(c, ' ')
}
// Remove empty and uninteresting words
val words = smsText.split(" ").filter(w => !ignoredWords.contains(w) && w.length>2).distinct
words.toSeq
})
textsRDD
}
- Configure Spark's Tf-IDF model builder:
def buildIDFModel(tokensRDD: RDD[Seq[String]],
minDocFreq:Int = 4,
hashSpaceSize:Int = 1 << 10): (HashingTF, IDFModel, RDD[mllib.linalg.Vector]) = {
// Hash strings into the given space
val hashingTF = new HashingTF(hashSpaceSize)
val tf = hashingTF.transform(tokensRDD)
// Build term frequency-inverse document frequency model
val idfModel = new IDF(minDocFreq = minDocFreq).fit(tf)
val expandedTextRDD = idfModel.transform(tf)
(hashingTF, idfModel, expandedTextRDD)
}
Wikipedia defines TF-IDF as: "tf–idf, short for term frequency–inverse document frequency, is a numerical statistic that is intended to reflect how important a word is to a document in a collection or corpus. It is often used as a weighting factor in information retrieval and text mining. The tf-idf value increases proportionally to the number of times a word appears in the document, but is offset by the frequency of the word in the corpus, which helps to adjust for the fact that some words appear more frequently in general."
- Configure H2O's DeepLearning model builder:
def buildDLModel(trainHF: Frame, validHF: Frame,
epochs: Int = 10, l1: Double = 0.001, l2: Double = 0.0,
hidden: Array[Int] = Array[Int](200, 200))
(implicit h2oContext: H2OContext): DeepLearningModel = {
import h2oContext._
import _root_.hex.deeplearning.DeepLearning
import _root_.hex.deeplearning.DeepLearningParameters
// Create algorithm parameres
val dlParams = new DeepLearningParameters()
// Name for target model
dlParams._model_id = Key.make("dlModel.hex")
// Training dataset
dlParams._train = trainHF
// Validation dataset
dlParams._valid = validHF
// Column used as target for training
dlParams._response_column = 'target
// Number of passes over data
dlParams._epochs = epochs
// L1 penalty
dlParams._l1 = l1
// Number internal hidden layers
dlParams._hidden = hidden
// Create a DeepLearning job
val dl = new DeepLearning(dlParams)
// And launch it
val dlModel = dl.trainModel.get
// Force computation of model metrics on both datasets
dlModel.score(trainHF).delete()
dlModel.score(validHF).delete()
// And return resulting model
dlModel
}
- Initialize
H2OContext
and start H2O services on top of Spark:
// Create SQL support
import org.apache.spark.sql._
implicit val sqlContext = SQLContext.getOrCreate(sc)
import sqlContext.implicits._
// Start H2O services
import org.apache.spark.h2o._
val h2oContext = new H2OContext(sc).start()
- Open H2O UI and verify that H2O is running:
h2oContext.openFlow
At this point, you can use the H2O UI and see the status of the H2O cloud by typing
getCloud
.
- Build the final workflow using all building pieces:
// Data load
val dataRDD = load(DATAFILE)
// Extract response column from dataset
val hamSpamRDD = dataRDD.map( r => r(0))
// Extract message from dataset
val messageRDD = dataRDD.map( r => r(1))
// Tokenize message content
val tokensRDD = tokenize(messageRDD)
// Build IDF model on tokenized messages
// It returns
// - hashingTF: hashing function to hash a word to a vector space
// - idfModel: a model to transform hashed sentence to a feature vector
// - tfidf: transformed input messages
var (hashingTF, idfModel, tfidfRDD) = buildIDFModel(tokensRDD)
// Merge response with extracted vectors
val resultDF = hamSpamRDD.zip(tfidfRDD).map(v => SMS(v._1, v._2)).toDF
// Publish Spark DataFrame as H2OFrame
val tableHF = h2oContext.asH2OFrame(resultDF, "messages_table")
// Transform target column into categorical!
tableHF.replace(tableHF.find("target"), tableHF.vec("target").toCategoricalVec()).remove()
tableHF.update(null)
// Split table into training and validation parts
val keys = Array[String]("train.hex", "valid.hex")
val ratios = Array[Double](0.8)
val frs = split(tableHF, keys, ratios)
val (trainHF, validHF) = (frs(0), frs(1))
tableHF.delete()
// Build final DeepLearning model
val dlModel = buildDLModel(trainHF, validHF)(h2oContext)
- Evaluate the model's quality:
// Collect model metrics and evaluate model quality
import water.app.ModelMetricsSupport
val trainMetrics = ModelMetricsSupport.binomialMM(dlModel, trainHF)
val validMetrics = ModelMetricsSupport.binomialMM(dlModel, validHF)
println(trainMetrics.auc._auc)
println(validMetrics.auc._auc)
You can also open the H2O UI and type
getPredictions
to visualize the model's performance or typegetModels
to see model output.
- Create a spam detector:
// Spam detector
def isSpam(msg: String,
dlModel: DeepLearningModel,
hashingTF: HashingTF,
idfModel: IDFModel,
h2oContext: H2OContext,
hamThreshold: Double = 0.5):String = {
val msgRdd = sc.parallelize(Seq(msg))
val msgVector: DataFrame = idfModel.transform(
hashingTF.transform (
tokenize (msgRdd))).map(v => SMS("?", v)).toDF
val msgTable: H2OFrame = h2oContext.asH2OFrame(msgVector)
msgTable.remove(0) // remove first column
val prediction = dlModel.score(msgTable)
if (prediction.vecs()(1).at(0) < hamThreshold) "SPAM DETECTED!" else "HAM"
}
- Try to detect spam:
isSpam("Michal, h2oworld party tonight in MV?", dlModel, hashingTF, idfModel, h2oContext)
//
isSpam("We tried to contact you re your reply to our offer of a Video Handset? 750 anytime any networks mins? UNLIMITED TEXT?", dlModel, hashingTF, idfModel, h2oContext)
- At this point, you have finished your 1st Sparkling Water Machine Learning application. Hack and enjoy! Thank you!
- Try to use different H2O model - for example, GBM.
- Spark Tf-IDF algorithm
- Conceptual API to call H2O algorithms
- Principle of training and using models for prediction