Learn How to Code and Deploy Machine Learning Models on Structured Streaming

This post is a token of appreciation for the amazing open source community of Data Science, to which I owe a lot of what I have learned.

For last few months I have been working upon my first streaming application which got into production a month earlier. It was a great learning experience with numerous challenges and lots of learning, some of which I have tried to share in here.

This post is focused on how to deploy machine learning models on streaming data and covers all 3 necessary areas of a successful production application: infrastructure, technology and monitoring.

DEVELOPING A MACHINE LEARNING MODEL WITH SPARK-ML AND STRUCTURED STREAMING

The first step for any successful application is to determine the technology stack in which it should be written, determined on the basis of business requirements. In our case, the primary business requirement was to be able to process data to the tune of a million messages per minute and report errors within a time span of 5 minutes from it’s time of occurence.

These conditions lead to the choice of spark as the framework to write the application. Primary benefits of using spark were it’s proven ability in big data processing and availability of, in built distributed machine learning libraries. With the introduction of Dataset api in spark2.0 it had now become easier than before to code up a machine learning algorithm.

During my experience I found that working with machine learning models becomes extremely easy with proper utilization of pipeline framework. In the snippet below I have tried to cover how to used this api to build, save and use the models for a prediction.

For building and saving a model one can follow the following code structure.

// Create a sqlContext
var sqlContext = new SQLContext(sparkContext)
// Read the training data from a source. In this case i am reading it from a s3 location
var data = sqlContext.read.format("csv").option("header", "true").option("inferSchema", "true").load("pathToFile")
// Select the needed messages 
data = data.select("field1","field2","field3")
// Perform pre-processing on the data 
val process1 = ... some process ... 
val process2 = ... some process ...
// Define an evaluator
val evaluator = ... evaluator of your choice ...
// split the data into training and test 
val Array(trainingData, testData) = data.randomSplit(Array(ratio1, ratio2))
// Define the algorithm to train. For example decision tree 
val dt = new DecisionTree()
.setFeaturesCol(featureColumn).setLabelCol(labelColumn)
// Define the linear pipeline. Methods specified in the pipeline are executed in a linear order. Sequence of steps is binding
val pipelineDT = new Pipeline().setStages(Array(process1, process2, dt))
// Define the cross validator for executing the pipeline and performing cross validation. 
val cvLR = new CrossValidator()
  .setEstimator(pipelineDT)
  .setEvaluator(evaluator)
  .setNumFolds(3)  // Use 3+ in practice
// Fit the model on training data 
val cvModelLR = cvLR.fit(trainingData)
// extract the best trained pipeline model from the cross validator. 
val bestPipelineModel = cvModelLR.bestModel.asInstanceOf[PipelineModel]
// Save the model in a s3 bucket 
cvModelLR.write.overwrite().save(mlModelPath)

Once a model is saved it can be used for prediction on streaming data easily with the following steps.

  1. Read data from a Kafka topic
// Create a spark session object 
val ss = SparkSession.builder.getOrCreate()
// Define schema of the topic to be consumed 
val schema= StructType( Seq(
StructField("Field1",StringType,true),
StructField("Field2",IntType,true)
    )
)
// Start reading from a Kafka topic
val records = ss.readStream
 .format("kafka")
 .option(“kafka.bootstrap.servers”, kafkaServer)
 .option(“subscribe”,kafkaTopic)
 .load()
 .selectExpr(“cast (value as string) as json”)
 .select(from_json($”json”,schema).as(“data”))
 .select(“data.*”)

2. Load a saved ML model and use it for prediction

// Load the classification model from saved location
val classificationModel = CrossValidatorModel.read.load(mlModelPath)
// Use the model to perform predictions. By default best model is used
val results = classificationModel.transform(records)

3. Save the results to s3 or other locations

In csv format

// Saving results to a location as csv 
results.writeStream.format("csv").outputMode("append")
.option("path", destination_path)            .option("checkpointLocation", checkpointPath)
.start()

In parquet format

// Saving results to a location as parquet 
results.writeStream.format("parquet").outputMode("append")
.option("path", destination_path)            .option("checkpointLocation", checkpointPath)
.start()

Or if we want to send the results to some database or any other extensions

val writer = new JDBCSink(url, user, password)
results.writeStream
            .foreach(writer)
            .outputMode("append")
            .option("checkpointLocation", checkpointPath)
            .start()

A separate writer needs to be implemented for this purpose by extending the ForeachWriter interface provided with spark structured streaming.A sample code for jdbc is shown below , taken from https://docs.databricks.com/_static/notebooks/structured-streaming-etl-kafka.html

<strong>import</strong> java.sql.<strong>_</strong>
class  JDBCSink(url:String, user:String, pwd:String) extends ForeachWriter[(String, String)] {
val driver = "com.mysql.jdbc.Driver"
var connection:Connection = _
var statement:Statement = _

def open(partitionId: Long,version: Long): Boolean = {
Class.forName(driver)
connection = DriverManager.getConnection(url, user, pwd)
statement = connection.createStatement
true
}
def process(value: (String, String)): Unit = {
statement.executeUpdate("INSERT INTO zip_test " +
"VALUES (" + value._1 + "," + value._2 + ")")
}
def close(errorOrNull: Throwable): Unit = {
connection.close
}
}

}
}

MONITORING, LOGGING AND ALERTS

The next step is to integrate monitoring, alerting and logging services in the application, so as, to get instantaneous alerts and keep a tab on how the application has been working. We ended up using CloudWatch for regular monitoring and slack for alerting. This is how our monitoring services ended up looking

Learn How to Code and Deploy Machine Learning Models on Structured Streaming
Learn How to Code and Deploy Machine Learning Models on Structured Streaming
Learn How to Code and Deploy Machine Learning Models on Structured Streaming
Learn How to Code and Deploy Machine Learning Models on Structured Streaming

INFRASTRUCTURE

Once the code was ready for deployment it was the time to choose appropriate infrastructure for deploying it. We decided to go with Kafka (mainly because of its multi publisher/consumer architecture and the ability to set retention periods over different topics), AWS EMR as the core infrastructure for running up applications and AWS SQS for storing results which are to be consumed downstream.

AWS EMR was the obvious choice owing to the availability of clusters with pre-installed spark and internal resource management. Ability to spin up a new cluster with full deployment in less than 30 mins in case of a major failure, was also a major plus point.

Here is how our architecture looked like

Learn How to Code and Deploy Machine Learning Models on Structured Streaming
Learn How to Code and Deploy Machine Learning Models on Structured Streaming

TUNING A SPARK JOB

Lastly, as in any other spark job, tuning it is necessary in the case of a streaming job as well for maximum efficiency. First step in tuning a spark job is to choose appropriate instances for the job. On performing several experiments on M4 vs C4 instance types, we found M4 to be better performing primarily because of it’s ability to provide virtual cores as well.

DynamicAllocationproperty in spark was also extremely useful in maximizing the utilization in a stable way. There are a number of other parameters as well which I found useful in tweaking performance:

a) — conf spark.yarn.executor.memoryOverhead =1024: The amount of memory overhead defined for the job

b) — conf spark.yarn.maxAppAttempts =4: This property defines maximum number of attempts which will be made to submit the application. It’s quite useful for scenarios where multiple spark jobs are being submitted to a single cluster and sometimes submit jobs fail because of lack of available resources.

c) — conf spark.task.maxFailures=8 : This property sets the maximum number of times a task can fail before the spark job fails itself. The default value is 2. It’s always a good idea to keep this number higher

d) — conf spark.speculation=false : When this property is set as true, yarn automatically kills and reassigns tasks based on the time they are consuming (if yarn sees them as being stuck). In our case we didn’t found this to be contributing much in performance but is a good property to look for while processing skewed data sets

e) — conf spark.yarn.max.executor.failures =15: The maximum number of executor failures before an application fails. Always set it to a higher number.

f) — conf spark.yarn.executor.failuresValidityInterval =1h: Defines the time interval for validity of executor failures. Combining with above property basically in hour maximum 15 executors can fail before the job dies.

g) — driver-memory 10g: Provide sufficiently high driver memory so as to not fail in case of a burst of messages are to be processed.

For delving in detail on how to tune a spark job please refer to my post

Tuning a Spark Job

This post is in continuation of my previous post on deploying a machine learning application on Spark Streaming. The… medium.com

Lastly, I would just like to mention that it was great fun working on this project and a pleasure sharing the experiences with you all. Hope it helps. Please feel free to connect with me on Linkedin

Vibhor nigam - Data Scientist - Smart Network Platform - Comcast | LinkedIn

View Vibhor nigam's profile on LinkedIn, the world's largest professional community. Vibhor has 6 jobs listed on their… www.linkedin.com

or reach out for any doubts.