This article is contributed. See the original author and article here.

In many use cases Machine Learning models are built and applied over data that is stored and managed by Azure Data Explorer (ADX). Most ML models are built and deployed in two steps:



  • Offline training

  • Real time scoring


ML Training is a long and iterative process. Commonly, a model is developed by researchers/data scientists. They fetch the training data, clean it, engineer features, try different models and tune parameters, repeating this cycle until the ML model meets the required accuracy and robustness. To improve accuracy, they can:



  • Use a big data set, if available, that might contain hundreds of millions of records and plenty of features (dimensions and metrics)

  • Train very complex models, e.g., a DNN with dozens of layers and millions of free parameters

  • Perform a more exhaustive search for tuning the model’s hyper parameters


Once the model is ready, it can be deployed to production for scoring.


 


ML Scoring is the process of applying the model on new data to get predictions/regressions. Scoring usually needs to be done with minimal latency (near real time) for batches of streamed data.


 


Azure Data Explorer (ADX) supports running inline Python scripts that are embedded in the KQL query. The Python code runs on the existing compute nodes of ADX, in distributed manner near the data. It can handle Data Frames containing many millions of records, partitioned and processed on multiple nodes. This optimized architecture results in great performance and minimal latency.


Specifically, for ML workloads, ADX can be used for both training and scoring:



  • Scoring on ADX is the ultimate solution for data that is stored on ADX, as

    • Processing is done near the data, which guarantees the fastest performance

    • Embedding the scoring Python code in KQL query is simple, robust and cheap, relative to the usage of an external scoring service that requires management, networking, security, etc.




Scoring can be done using the predict_fl() library function



  • Training on ADX can be done in case the full training data set is stored in ADX, the training process takes up to few minutes and doesn’t require GPUs or other special hardware


Still in many scenarios training is done on Big Data systems, such as Spark/Databricks. Specifically, ML training on these systems is preferred in case that:



  • The training data is not stored in ADX, but in the data lake or other external storage/db

  • The training process is long (takes more than 5-10 minutes), usually done in batch/async mode

  • Training can be accelerated by using GPUs

  • ADX production workflows must not be compromised by lengthy, CPU intensive, training jobs


So we end up in a workflow that uses Spark/Databricks for training, and ADX for scoring. But the problem is that training on these Spark platforms is mostly done using the Spark ML framework, that is optimized for Spark architecture, but not supported by plain vanilla Python environment like ADX Python. So how can we still score in ADX?


We present a solution which is built from these steps:



  1. Fetch the training data from ADX to Azure Databricks using ADX Spark Connector

  2. Train an ML model in Azure Databricks

  3. Convert the model to ONNX

  4. Serialize and export the model to ADX using the same Spark connector

  5. Score in ADX using onnxruntime


Prerequisite



  • Enable Python plugin on your ADX cluster (see the Onboarding section of the python() plugin doc)

  • Create a workspace in Azure Databricks

  • Install the Spark connector in that workspace as explained here

  • Install onnxmltools in that workspace


In the following example we build a logistic regression model to predict room occupancy based on Occupancy Detection data, a public dataset from UCI Repository. This model is a binary classifier to predict occupied/empty room based on Temperature, Humidity, Light and CO2 sensors measurements. The example contains code snips from Databricks notebook showing for the full process of retrieving the data from ADX, building the model, convert it to ONNX and push it to ADX. Finally the KQL scoring query to be run using Kusto Explorer.


 


1. Load the data from ADX to Databricks


 

from pyspark.sql import SparkSession
pyKusto = SparkSession.builder.appName("kustoPySpark").getOrCreate()

cluster = 'https://demo11.westus.kusto.windows.net'
db = 'ML'
query = 'OccupancyDetection'

AppId = '***** Your App Id *****'
AppSecret = '***** Your App Secret *****'
AuthorityId = '***** Your Authority Id *****'

# Read the data from the kusto table with default reading mode

s_df  = pyKusto.read. 
    format("com.microsoft.kusto.spark.datasource"). 
    option("kustoCluster", cluster). 
    option("kustoDatabase", db). 
    option("kustoQuery", query). 
    option("kustoAadAppId", AppId). 
    option("kustoAadAppSecret", AppSecret). 
    option("kustoAadAuthorityID", AuthorityId). 
    load()

s_df.take(4)
Out[37]: [Row(Timestamp=datetime.datetime(2015, 2, 4, 17, 51), Temperature=23.18, Humidity=27.272, Light=426.0, CO2=721.25, HumidityRatio=0.004792988, Occupancy=True, Test=False),

 Row(Timestamp=datetime.datetime(2015, 2, 4, 17, 51), Temperature=23.15, Humidity=27.2675, Light=429.5, CO2=714.0, HumidityRatio=0.004783441, Occupancy=True, Test=False),

 Row(Timestamp=datetime.datetime(2015, 2, 4, 17, 53), Temperature=23.15, Humidity=27.245, Light=426.0, CO2=713.5, HumidityRatio=0.004779464, Occupancy=True, Test=False),

 Row(Timestamp=datetime.datetime(2015, 2, 4, 17, 54), Temperature=23.15, Humidity=27.2, Light=426.0, CO2=708.25, HumidityRatio=0.004771509, Occupancy=True, Test=False)]

 


2. Train the ML model in Azure Databricks


 

s_df.groupBy('Test', 'Occupancy').count().show()
+-----+---------+-----+
| Test|Occupancy|count|
+-----+---------+-----+
| true|    false| 9396|
| true|     true| 3021|
|false|    false| 6414|
|false|     true| 1729|
+-----+---------+-----+
# Prepare the input for the model

# Spark Logistic Regression estimator requires integer label so create it from the boolean Occupancy column
s_df = s_df.withColumn('Label', s_df['Occupancy'].cast('int'))

# Split to train & test sets
s_train = s_df.filter(s_df.Test == False)
s_test = s_df.filter(s_df.Test == True)

# Create the Logistic Regression model
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression

# The Logistic Regression estimator expects the features in a single column so create it using VectorAssembler
features = ('Temperature', 'Humidity', 'Light', 'CO2', 'HumidityRatio')
assembler = VectorAssembler(inputCols=features,outputCol='Features')
s_train_features = assembler.transform(s_train)
s_train_features.take(4)
lr = LogisticRegression(labelCol='Label', featuresCol='Features',maxIter=10)
s_clf = lr.fit(s_train_features)

# Predict the training set
s_predict_train = s_clf.transform(s_train_features)

# Predict the testing set
s_test_features = assembler.transform(s_test)
s_predict_test = s_clf.transform(s_test_features)
s_predict_test.select(['Timestamp', 'Features', 'Label', 'prediction']).show(10)
+-------------------+--------------------+-----+----------+
|          Timestamp|            Features|Label|prediction|
+-------------------+--------------------+-----+----------+
|2015-02-02 14:19:00|[23.7,26.272,585....|    1|       1.0|
|2015-02-02 14:19:00|[23.718,26.29,578...|    1|       1.0|
|2015-02-02 14:21:00|[23.73,26.23,572....|    1|       1.0|
|2015-02-02 14:22:00|[23.7225,26.125,4...|    1|       1.0|
|2015-02-02 14:23:00|[23.754,26.2,488....|    1|       1.0|
|2015-02-02 14:23:00|[23.76,26.26,568....|    1|       1.0|
|2015-02-02 14:25:00|[23.73,26.29,536....|    1|       1.0|
|2015-02-02 14:25:00|[23.754,26.29,509...|    1|       1.0|
|2015-02-02 14:26:00|[23.754,26.35,476...|    1|       1.0|
|2015-02-02 14:28:00|[23.736,26.39,510...|    1|       1.0|
+-------------------+--------------------+-----+----------+
only showing top 10 rows
# Calculate accuracy on the testing set

import pyspark.sql.functions as F
check = s_predict_test.withColumn('correct', F.when(F.col('Label') == F.col('prediction'), 1).otherwise(0))
check.groupby('correct').count().show()
accuracy = check.filter(check['correct'] == 1).count()/check.count()*100
print(f'Accuracy: {accuracy}')
+-------+-----+
|correct|count|
+-------+-----+
|      1|12271|
|      0|  146|
+-------+-----+

Accuracy: 98.8241926391238

 


3. Convert the model to ONNX


 

from onnxmltools import convert_sparkml
from onnxmltools.convert.sparkml.utils import FloatTensorType

initial_types = [('Features', FloatTensorType([None, 5]))]
onnx_model = convert_sparkml(s_clf, 'Occupancy detection Pyspark Logistic Regression model', initial_types, spark_session = pyKusto)
onnx_model
{'classlabels_ints': [0, 1],
 'coefficients': [0.2995554662269534,
                  0.08678036676466962,
                  -0.01768699375517248,
                  -0.005589950773872156,
                  19.092004694715197,
                  -0.2995554662269534,
                  -0.08678036676466962,
                  0.01768699375517248,
                  0.005589950773872156,
                  -19.092004694715197],
 'intercepts': [1.396631045353889, -1.396631045353889],
 'multi_class': 1,
 'name': 'LinearClassifier',
 'post_transform': 'LOGISTIC'}

(full print trimmed here)

 


4. Export the model to ADX


 

import datetime
import pandas as pd

smodel = onnx_model.SerializeToString().hex()
models_tbl = 'Databricks_Models'
model_name = 'Occupancy_Detection_LR'

# Create a DataFrame containing a single row with model name, training time and
# the serialized model, to be appended to the models table
now = datetime.datetime.now()
dfm = pd.DataFrame({'name':[model_name], 'timestamp':[now], 'model':[smodel]})
sdfm = spark.createDataFrame(dfm)
sdfm.show()
+--------------------+--------------------+--------------------+
|                name|           timestamp|               model|
+--------------------+--------------------+--------------------+
|Occupancy_Detecti...|2021-01-26 19:02:...|0807120b4f6e6e784...|
+--------------------+--------------------+--------------------+

 


 

# Write the model to Kusto
sdfm.write.format("com.microsoft.kusto.spark.datasource"). 
    option("kustoCluster", cluster). 
    option("kustoDatabase", db). 
    option("kustoAadAppId", AppId). 
    option("kustoAadAppSecret", AppSecret). 
    option("kustoAadAuthorityID", AuthorityId). 
    option("kustoTable", models_tbl). 
    mode("Append"). 
    save()​

 


5. Score in ADX


Is done by calling predict_onnx_fl() You can either install this function in your database, or call it in ad-hoc manner:


 

let predict_onnx_fl=(samples:(*), models_tbl:(name:string, timestamp:datetime, model:string), model_name:string, features_cols:dynamic, pred_col:string)
{
    let model_str = toscalar(models_tbl | where name == model_name | top 1 by timestamp desc | project model);
    let kwargs = pack('smodel', model_str, 'features_cols', features_cols, 'pred_col', pred_col);
    let code =
    'n'
    'import binasciin'
    'n'
    'smodel = kargs["smodel"]n'
    'features_cols = kargs["features_cols"]n'
    'pred_col = kargs["pred_col"]n'
    'bmodel = binascii.unhexlify(smodel)n'
    'n'
    'features_cols = kargs["features_cols"]n'
    'pred_col = kargs["pred_col"]n'
    'n'
    'import onnxruntime as rtn'
    'sess = rt.InferenceSession(bmodel)n'
    'input_name = sess.get_inputs()[0].namen'
    'label_name = sess.get_outputs()[0].namen'
    'df1 = df[features_cols]n'
    'predictions = sess.run([label_name], {input_name: df1.values.astype(np.float32)})[0]n'
    'n'
    'result = dfn'
    'result[pred_col] = pd.DataFrame(predictions, columns=[pred_col])'
    'n'
    ;
    samples | evaluate python(typeof(*), code, kwargs)
};
//
OccupancyDetection 
| where Test == 1
| extend pred_Occupancy=int(null)
| invoke predict_onnx_fl(Databricks_Models, 'Occupancy_Detection_LR', pack_array('Temperature', 'Humidity', 'Light', 'CO2', 'HumidityRatio'), 'pred_Occupancy')
| summarize correct = countif(Occupancy == pred_Occupancy), incorrect = countif(Occupancy != pred_Occupancy), total = count()
| extend accuracy = 100.0*correct/total
correct	incorrect	total	accuracy
12271	146	        12417	98.8241926391238

 


Summary


In this blog we presented how to train your ML model in Azure Databricks, and use it for scoring in ADX. This can be done by converting the trained model from Spark ML to ONNX, a common ML model exchange format, enabling it to be consumed for scoring by ADX python() plugin.


This workflow is common for ADX customers that are building Machine Learning algorithms by batch training using Spark/Databricks models on big data stored in the data lake. This new option to use this model for scoring directly on ADX is very appealing as it’s fast, simple and free.

Brought to you by Dr. Ware, Microsoft Office 365 Silver Partner, Charleston SC.