Using Python with SPSS Modeler extension node (2) Model creation using Spark MLlib

0. Introduction

SPSS Modeler provides various functions that are generally used in analysis, but I think there are cases where you want to use R and Python functions as well. You can run R and Python from SPSS Modeler by using the extension node of SPSS Modeler.

This time, I will try to create a model using Spark MLlib with Python integration. For the environment setup method that is a prerequisite for Python integration, see Using Python with SPSS Modeler Extension Nodes (1) Setup and Visualization.

If you want to try R integration, @ kawada2017's Use R with SPSS Modeler extension node Is introduced.

■ Test environment SPSS Modeler 18.2.2 Windows 10 Python 3.7.7

1. Data preparation

Download the iris dataset as a dataset for checking the operation. https://github.com/mwaskom/seaborn-data/blob/master/iris.csv

Start SPSS Modeler, add a variable length node, and load iris.csv. image.png

Add a data type node and a data partition node and connect them. image.png

Open the properties of the data partition node and add a setting to divide the whole data into ** model training ** and ** test ** at any ratio. After adding the settings, click Apply. In the example, the training data and the test data are divided into 80% and 20% ratios. After splitting, preview the data partition node. image.png

The previewed data has an additional column called Data Classification, which contains one of the values ​​"1_Learning" and "2_Test". If you specify Data division = '1_Learning' in the condition extraction node, you can extract only the training data and use it for model creation. image.png

Add a condition extraction node and connect from the data partition node. Open the properties of the node and write Data division = '1_Learning' as shown below. image.png

Also, change the node name to make it easier to understand the purpose of the node. On the Annotation tab, specify "Training data" for the node name and click OK. image.png

With the steps up to this point, the data is ready. image.png

2. Creating an extended model

From the Model Creation tab, add an extended model node and connect from the training data. image.png

Open the properties of the extended model node. The property is divided into two sections. Model creation syntax and model scoring syntax. After this, we will create each syntax. image.png

2-1. Model creation Syntax creation

The syntax for creating a model is described in the syntax of Python for Spark. The general flow of description is as follows. I will explain each part separately.

  1. Extension node-specific preprocessing
  2. Modeling (general Pyspark and MLlib syntax)
  3. Save model (description specific to extension node)

Overall syntax for modeling


#Importing libraries for Analytics Server interaction
import spss.pyspark.runtime

#Analytics Server context object definition
ascontext = spss.pyspark.runtime.getContext()

#Spark context definition
sc = ascontext.getSparkContext()

#Data reading
df = ascontext.getSparkInputData()

#Import of pyspark and mllib modules etc.
from pyspark.context import SparkContext
from pyspark.sql.context import SQLContext
from pyspark.ml.feature import StringIndexer, VectorIndexer, IndexToString
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql import Row
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import udf
from pyspark.ml.linalg import Vectors
import numpy
import json

#Specifying input fields (explanatory variables)
inputs = ["sepal_length", "sepal_width", "petal_length", "petal_width"]

#Specifying the prediction target field (objective variable)
target = "species"

#Definition of pre-modeling process
#Encode string labels into numbers
labelIndexer = StringIndexer(inputCol=target, outputCol="label").fit(df)

#Output the correspondence between the character string and the encoded numerical value.Required when creating a process for decoding scoring results
print(labelIndexer.labels)

#MLlib model creation Vectorization process of input data.If there are multiple variables, it is necessary to vectorize each row.
assembler = VectorAssembler(inputCols=inputs, outputCol="features")

#Define a random forest classifier
rf=RandomForestClassifier(labelCol="label", featuresCol="features")

#Define each step as a pipeline
pipeline = Pipeline(stages=[labelIndexer, assembler, rf])

#Building a pipeline model
model = pipeline.fit(df)

#Check the accuracy of the model
results = model.transform(df)
evaluator = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(results)
print("Test Error = %g" % (1.0 - accuracy))

#Save model
modelpath = ascontext.createTemporaryFolder()
model.save(modelpath)
ascontext.setModelContentFromPath("model", modelpath)

print("Save model completed.")

** 1. Preprocessing common to extended nodes **

When dealing with Python in an extension node, it is always necessary to describe it.

Import the library spss.pyspark.runtime for Analytics Server interaction. Define an Analytics Server context object in the variable ascontext. Define a Spark context to the variable sc via the Analytics Server context.

The variable df stores the data from the previous node. In this case, the training data extracted by the condition extraction node is stored in Spark Dataframe format.

1.Preprocessing common to extended nodes


#Importing libraries for Analytics Server interaction
import spss.pyspark.runtime

#Analytics Server context object definition
ascontext = spss.pyspark.runtime.getContext()

#Spark context definition
sc = ascontext.getSparkContext()

#Data reading
df = ascontext.getSparkInputData()

** 2. Modeling (general Pyspark and MLlib syntax) **

The modeling process is basically the same as the general Pyspark and MLlib syntax. Therefore, it is possible to create it according to the guide of MLlib.

--Machine Learning Library (MLlib) Guide - http://mogile.web.fc2.com/spark/spark211/ml-guide.html

This time, we will create a model to classify sprcies (variety) among the fields contained in the iris dataset. As input data, use 4 numeric field items other than the product type field. It also uses a random forest as the algorithm for the classification model.

--Input data (explanatory variable) --Sepal_length: Unit is cm --sepal_width: Unit is cm --petal_length: Unit is cm --petal_width: Unit is cm

--Prediction target (objective variable) --Species: There are 3 categories: setosa, versicolor, and virginica.

Spark MLlib cannot handle the category name of the character string as it is. Therefore, use StringIndexer to encode to a numeric label. At this time, it is necessary to make a note of the correspondence between the category name and the numerical label. In the example, the species are printed in the order encoded on the numeric labels.

image.png

2.Modeling


#Import of pyspark and mllib modules etc.
from pyspark.context import SparkContext
from pyspark.sql.context import SQLContext
from pyspark.ml.feature import StringIndexer, VectorIndexer, IndexToString
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql import Row
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import udf
from pyspark.ml.linalg import Vectors
import numpy
import json

#Specifying input fields (explanatory variables)
inputs = ["sepal_length", "sepal_width", "petal_length", "petal_width"]

#Specifying the prediction target field (objective variable)
target = "species"

#Definition of pre-modeling process
#Encode string labels into numbers
labelIndexer = StringIndexer(inputCol=target, outputCol="label").fit(df)

#Output the correspondence between the character string and the encoded numerical value.Required when creating a process for decoding scoring results
print(labelIndexer.labels)

#MLlib model creation Vectorization process of input data.If there are multiple variables, it is necessary to vectorize each row.
assembler = VectorAssembler(inputCols=inputs, outputCol="features")

#Define a random forest classifier
rf=RandomForestClassifier(labelCol="label", featuresCol="features")

#Define each step as a pipeline
pipeline = Pipeline(stages=[labelIndexer, assembler, rf])

#Building a pipeline model
model = pipeline.fit(df)

#Check the accuracy of the model
results = model.transform(df)
evaluator = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(results)
print("Test Error = %g" % (1.0 - accuracy))

** 3. Save model (description specific to extension node) **

Save the last trained model. This time, as the save destination path of the model, create a temporary storage area using createTemporaryFolder () of Analytics Server context object and use it. This temporary storage area is created under the temp folder of SPSS Modeler.

3.Save model


#Save model
modelpath = ascontext.createTemporaryFolder()
model.save(modelpath)
ascontext.setModelContentFromPath("model", modelpath)

print("Save model completed.")

See the SPSS Modeler manual for more information on creating models.

--Scripts using Python for Spark> Creating Models - https://www.ibm.com/support/knowledgecenter/ja/SS3RA7_18.2.2/modeler_r_nodes_ddita/clementine/r_pyspark_api.html

2-2. Creating a model scoring syntax

The syntax for creating a model is described in the syntax of Python for Spark. The general flow of description is as follows. I will explain each part separately.

  1. Preprocessing common to extended nodes
  2. Preparation of scoring result output
  3. Scoring execution

Scoring syntax


#Importing libraries for Analytics Server interaction
import spss.pyspark.runtime

#Analytics Server context object definition
ascontext = spss.pyspark.runtime.getContext()

#Spark context definition
sc = ascontext.getSparkContext()

#Import of pyspark and mllib modules etc.
from pyspark.context import SparkContext
from pyspark.sql.context import SQLContext
from pyspark.ml.feature import StringIndexer, VectorIndexer, IndexToString
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier, RandomForestClassificationModel
from pyspark.ml import PipelineModel
from pyspark.sql import Row
from pyspark.sql.types import DoubleType, StructField
from pyspark.sql.functions import udf
from pyspark.ml.linalg import Vectors
import numpy
import json

#Column definition to output (=Schema)
outputSchema = ascontext.getSparkInputSchema()#Get column definition of input data
outputSchema.fields.append(StructField('prediction',DoubleType(), nullable=True))#Added column for scoring results
outputSchema.fields.append(StructField('probability_0',DoubleType(), nullable=True))#Added column to store affiliation probability of label 1
outputSchema.fields.append(StructField('probability_1',DoubleType(), nullable=True))#Added column to store affiliation probability of label 2
outputSchema.fields.append(StructField('probability_2',DoubleType(), nullable=True))#Added column to store label 3 affiliation probability

#Register the output column definition in the Analytics Server context
ascontext.setSparkOutputSchema(outputSchema)
print(outputSchema)

#A function that retrieves a value from an array
udf_0 = udf(lambda vector: float(vector[0]), DoubleType())
udf_1 = udf(lambda vector: float(vector[1]), DoubleType())
udf_2 = udf(lambda vector: float(vector[2]), DoubleType())

if not ascontext.isComputeDataModelOnly():

	#Data reading
	indf = ascontext.getSparkInputData()
	
	#Model loading
	model_path = ascontext.getModelContentToPath("model")
	model = PipelineModel.load(model_path)
	
	#Scoring execution
	r1 = model.transform(indf)
	
	#Formatting scoring results
	#Classes classified as prediction, probability includes the probability of belonging to each class as an array
	#Extract the probability of each class from the probability array and store it in each column
	r2 = (r1.select(
			r1["sepal_length"], 
			r1["sepal_width"], 
			r1["petal_length"], 
			r1["petal_width"],
			r1["species"], 
			r1["prediction"],
			r1["probability"]
		).withColumn('probability_0', udf_0(r1.probability)
		).withColumn('probability_1', udf_1(r1.probability)
		).withColumn('probability_2', udf_2(r1.probability)
		).drop("probability")
	)
	
	#Changed the formatted Spark Dataframe name to something easier to understand
	outdf=r2
	
	#Register the formatted Spark Dataframe in the Analytics Server context. The scoring results will be available on subsequent nodes.
	ascontext.setSparkOutputData(outdf)

** 1. Preprocessing common to extended nodes ** I will omit it because it has the same content as the syntax for creating a model.

** 2. Preparation for scoring result output **

Mllib's Random Forest returns scoring results in the following format: Array of probabilities The number of classification probabilities contained corresponds to the number of categories contained in the predicted column.

-prediction [Category label with the highest classification probability] --probaility [Probability of classification into category 0, Probability of classification into category 1, ...., Probability of classification into category N]

In this case, the predicted column species has three categories: setosa, versicolor, and virginica. Therefore, the probability is [Probability of classification into category 0, probability of classification into category 1, probability of classification into category 2].

Create a column definition for storing scoring results so that you can use the scoring results in SPSS Modeler. Create a column definition for storing scoring results by adding it to the columns included in the input data.

There are four additional columns to create. Of these, probability_ * needs to be created according to the number of categories included in the forecast target. There are three categories of species to be predicted this time: setosa/versicolor/virginica. Therefore, 3 columns are created from probability_0 to probability_2.

-prediction: Column for scoring result label. Contains one of the categories 0/1/2 predicted by the model. --probability_0: Probability of classification into category 0 --Probability_1: Probability of classification into category 1 --Probability_2: Probability of classification into category 2

Register the column definition for storing the scoring result and the column definition to be output in the Analytics Server context. In addition, since the probability of classification probability for each class in the scoring result is returned as an array, define a function to retrieve the value from the array.

2.Preparation of scoring result output


#Import of pyspark and mllib modules etc.
from pyspark.context import SparkContext
from pyspark.sql.context import SQLContext
from pyspark.ml.feature import StringIndexer, VectorIndexer, IndexToString
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier, RandomForestClassificationModel
from pyspark.ml import PipelineModel
from pyspark.sql import Row
from pyspark.sql.types import DoubleType, StructField
from pyspark.sql.functions import udf
from pyspark.ml.linalg import Vectors
import numpy
import json

#Column definition to output (=Schema)
outputSchema = ascontext.getSparkInputSchema()#Get column definition of input data
outputSchema.fields.append(StructField('prediction',DoubleType(), nullable=True))#Added column for scoring result label
outputSchema.fields.append(StructField('probability_0',DoubleType(), nullable=True))#Added column to store category 0 affiliation probabilities
outputSchema.fields.append(StructField('probability_1',DoubleType(), nullable=True))#Added column to store category 1 affiliation probabilities
outputSchema.fields.append(StructField('probability_2',DoubleType(), nullable=True))#Added column to store category 2 affiliation probabilities

#Register the output column definition in the Analytics Server context
ascontext.setSparkOutputSchema(outputSchema)
print(outputSchema)

#A function that retrieves a value from an array
udf_0 = udf(lambda vector: float(vector[0]), DoubleType())
udf_1 = udf(lambda vector: float(vector[1]), DoubleType())
udf_2 = udf(lambda vector: float(vector[2]), DoubleType())

** 3. Scoring execution **

Read the input data into the variable indf. Load the saved model. Store the spark dataframe containing the scoring execution result in the variable r1. Format r1 and register it in the Analytics Server context for use on subsequent nodes.

Scoring syntax


if not ascontext.isComputeDataModelOnly():

	#Data reading
	indf = ascontext.getSparkInputData()
	
	#Model loading
	model_path = ascontext.getModelContentToPath("model")
	model = PipelineModel.load(model_path)
	
	#Scoring execution
	r1 = model.transform(indf)
	
	#Formatting scoring results
	#Classes classified as prediction, probability includes the probability of belonging to each class as an array
	#Extract the probability of each class from the probability array and store it in each column
	r2 = (r1.select(
			r1["sepal_length"], 
			r1["sepal_width"], 
			r1["petal_length"], 
			r1["petal_width"],
			r1["species"], 
			r1["prediction"],
			r1["probability"]
		).withColumn('probability_0', udf_0(r1.probability)
		).withColumn('probability_1', udf_1(r1.probability)
		).withColumn('probability_2', udf_2(r1.probability)
		).drop("probability")
	)
	
	#Changed the formatted Spark Dataframe name to something easier to understand
	outdf=r2
	
	#Register the formatted Spark Dataframe in the Analytics Server context. The scoring results will be available on subsequent nodes.
	ascontext.setSparkOutputData(outdf)

That's all for creating the syntax.

Finally, change the node name of the extended model node. On the Annotation tab, specify "RandomForestClassifier" as the name of the node and click OK. image.png

2-3. Execution of extended model node

Confirm that the description of each syntax is completed, and execute the extended model node. It will take a few minutes to complete the execution. image.png

When the run is complete, the model nugget will be added. image.png

You can check the contents by double-clicking on the model nugget. Open the text output tab and check the output contents of the first line. image.png

The sequence on the first line contains the variety name. Make a note of this content. This array contains the product names in the order in which they were encoded on the numeric label. It is required when converting the numeric label back to the product name. In the example, it is ['versicolor','virginica','setosa'], so you can see that it is encoded in the numeric label as [0, 1, 2] respectively.

3. Run the test

From the record setting tab, add a condition extraction node and connect from the data division node. image.png

Open the properties of the condition extraction node and write data division = '2_test' as shown below. image.png

Also, change the node name to make it easier to understand the purpose of the node. On the Annotation tab, specify "Test Data" for the node name and click OK. image.png

From the Field Settings tab, add a filter node to filter the data partition field. The purpose is to exclude columns that are not needed in the model when scoring. image.png image.png

Copy and paste the model nugget and connect from the filter node. image.png

From the record settings tab, add an extended conversion node and connect from the model nugget. image.png

Open the properties of the extension conversion node and describe the process for converting the label output as a numerical value to the product name. For my_labels, specify the array that you wrote down after creating the model. This process stores the prediction result varieties in a column called prediction_species.

Convert prediction result label to product name


import spss.pyspark.runtime

ascontext = spss.pyspark.runtime.getContext()
sc = ascontext.getSparkContext()

my_labels=['versicolor', 'virginica', 'setosa']

from pyspark.ml.feature import IndexToString
from pyspark.sql.types import StringType, StructField
from pyspark.sql.types import StringType,DoubleType
import numpy

outputSchema = ascontext.getSparkInputSchema()
outputSchema.fields.append(StructField('prediction_species', StringType(), nullable=True))
ascontext.setSparkOutputSchema(outputSchema)

if not ascontext.isComputeDataModelOnly():
        indf = ascontext.getSparkInputData()
        converter = IndexToString(
                inputCol="prediction", 
                outputCol="prediction_species", 
                labels=my_labels
        )
        outdf = converter.transform(indf)

        # return the output DataFrame as the result
        ascontext.setSparkOutputData(outdf)

image.png

From the Output tab, add a crosstab node and connect it to the extended transform node. image.png

Sets the properties of the crosstab node.

--Line: species --Column: prediction_species

image.png

On the Appearance tab of the Crosstab node, check the following:

--Frequency --Column percentage --Include row and column totals

image.png

Run the crosstab node. Output the confusion matrix of the test data and check how well the prediction result matches the actual product type. In the example, one versicolor data is misclassified as virginica, but the other data have been successfully classified into the correct variety. image.png

That's all for the procedure.

reference

See below for more information on Analytics Server contexts.

Script using Python for Spark https://www.ibm.com/support/knowledgecenter/ja/SS3RA7_18.2.2/modeler_r_nodes_ddita/clementine/r_pyspark_api.html

Analytic Server Context https://www.ibm.com/support/knowledgecenter/ja/SS3RA7_18.2.2/modeler_r_nodes_ddita/clementine/r_pyspark_api_context.html

Recommended Posts

Using Python with SPSS Modeler extension node (2) Model creation using Spark MLlib
Using Python with SPSS Modeler extension nodes ① Setup and visualization
Rewrite the record addition node of SPSS Modeler with Python.
Change node settings in supernodes with SPSS Modeler Python scripts
Rewrite the sampling node of SPSS Modeler with Python (2): Layered sampling, cluster sampling
Recent ranking creation using Qiita API with Python
Rewrite the sampling node of SPSS Modeler with Python ①: First N cases, random sampling
Rewrite field order nodes in SPSS Modeler with Python.
[S3] CRUD with S3 using Python [Python]
Using Quaternion with Python ~ numpy-quaternion ~
Using OpenCV with Python @Mac
Send using Python with Gmail
[Machine learning] Try running Spark MLlib with Python and make recommendations
[In-Database Python Analysis Tutorial with SQL Server 2017] Step 6: Using the model
Complement python with emacs using company-jedi
Harmonic mean with Python Harmonic mean (using SciPy)
[Python] Using OpenCV with Python (Image Filtering)
Scraping with Node, Ruby and Python
Using Rstan from Python with PypeR
[Python] Using OpenCV with Python (Image transformation)
[Python] Mixed Gauss model with Pyro
Notes on using rstrip with python.
Excel graph creation using python xlwings
When using MeCab with virtualenv python
Precautions when using six with Python 2.5
GUI creation in python using tkinter 2
Rewrite SPSS Modeler reconfigure node in Python. Aggregation by purchased product category