[PYTHON] How to use the Spark ML pipeline

Spark ML has a mechanism called Pipeline that combines continuous conversion operations of data frames into one. Using this will make it easier to write code and will improve the efficiency of memory usage inside Spark.

The general flow is as follows

Let's rewrite the previously posted Principal component analysis with Spark ML using Pipeline.

Stage and pipeline

The stage in the pipeline is called the stage. In the example of principal component analysis, there were the following three stages.

Declare the pipeline using these three. df is a data frame that contains the input data. See the previous article for details on df.

from pyspark.ml.pipeline import Pipeline

#Each stage of Pipeline
assembler = VectorAssembler(inputCols=df.columns[1:], outputCol="variable")
scaler = StandardScaler(inputCol="variable", outputCol="標準化variable", withStd=True, withMean=True)
pca = PCA(k=3, inputCol="Standardized variate", outputCol="Main component score")

#Pipeline Declaration
pipeline = Pipeline(stages=[
    assembler,
    scaler,
    pca
])

Model generation

Enter data into the constructed pipeline and create a model.

model = pipeline.fit(df)

Model execution

result = model.transform(df)
result.select("Main component score").show(truncate=False)

Execution result

I got the same results as when I ran them individually in the previous article.

+---------------------------------------------------------------+
|Main component score|
+---------------------------------------------------------------+
|[-2.2620712255691466,0.4021126641946994,0.35861418406317674]   |
|[1.3672950172090064,-0.516574975843834,0.8240383763102186]     |
|[-0.35784774304549694,1.0654633785914394,-0.7670998522924913]  |
|[0.3930334607140129,-1.220525792393691,-0.05437714111925901]   |
|[0.9712806670593661,1.7644947192188811,-0.2783291638335238]    |
|[0.8556397135650156,-0.9097726336587761,-1.0627843972001996]   |
|[1.0076787432724863,0.1504509197015279,1.2009982469039933]     |
|[-1.8977055313059759,-0.9270196509736093,-0.005660728153863093]|
|[0.4960234396284956,-0.24274673811341405,-0.6858245266064249]  |
|[-0.5733265415277634,0.43411810927677885,0.47042500192836967]  |
+---------------------------------------------------------------+

How to refer to the stage

Stage objects can be referenced with model.stages []. Let's refer to the PCA model of the third stage.

print("====Eigenvector====")
print(model.stages[2].pc)

print("====Contribution rate====")
print(model.stages[2].explainedVariance)

Summary

By using Pipeline, the intermediate variables disappeared and the code was written neatly. You can also refer to individual models for each stage, so there seems to be no reason not to use Pipeline.

All sources

# -*- coding: utf-8 -*-
from pyspark.sql import SparkSession
from pyspark.ml.pipeline import Pipeline
from pyspark.ml.feature import PCA, VectorAssembler, StandardScaler

# Initialize SparkSession
spark = (SparkSession
         .builder
         .appName("news")
         .enableHiveSupport()
         .getOrCreate())

# Read raw data
df = spark.read.csv('news.csv', header=True, inferSchema=True, mode="DROPMALFORMED", encoding='UTF-8')

print("====Raw data====")
df.show(truncate=False)

#Prepare pipeline parts
assembler = VectorAssembler(inputCols=df.columns[1:], outputCol="variable")
scaler = StandardScaler(inputCol="variable", outputCol="標準化variable", withStd=True, withMean=True)
pca = PCA(k=3, inputCol="Standardized variate", outputCol="Main component score")

pipeline = Pipeline(stages=[
    assembler,
    scaler,
    pca
])

#Run the pipeline to build a model from the input data
model = pipeline.fit(df)

#Run the model
result = model.transform(df)
result.show(truncate=False)

#Pipeline's stage.Can be referenced in stages
print("====Eigenvector====")
print(model.stages[2].pc)

print("====Contribution rate====")
print(model.stages[2].explainedVariance)

Recommended Posts

How to use the Spark ML pipeline
How to use the generator
How to use the decorator
How to use the zip function
How to use the optparse module
How to use the ConfigParser module
[Linux] How to use the echo command
How to use the Linux grep command
How to use the IPython debugger (ipdb)
How to use xml.etree.ElementTree
How to use Python-shell
How to use tf.data
How to use virtualenv
How to use Seaboan
How to use image-match
How to use shogun
How to use Pandas 2
How to use Virtualenv
How to use numpy.vectorize
How to use pytest_report_header
How to use partial
How to use Bio.Phylo
How to use SymPy
How to use x-means
How to use WikiExtractor.py
How to use IPython
How to use virtualenv
How to use Matplotlib
How to use iptables
How to use numpy
How to use TokyoTechFes2015
How to use venv
How to use dictionary {}
How to use Pyenv
How to use list []
How to use python-kabusapi
How to use OptParse
How to use return
How to use dotenv
How to use pyenv-virtualenv
How to use Go.mod
How to use imutils
How to use import
How to use the C library in Python
How to use MkDocs for the first time
How to use the graph drawing library Bokeh
How to use the Google Cloud Translation API
How to use the NHK program guide API
[Algorithm x Python] How to use the list
How to use Qt Designer
How to use search sorted
[gensim] How to use Doc2Vec
python3: How to use bottle (2)
Understand how to use django-filter
[Python] How to use list 1
How to use the Raspberry Pi relay module Python
How to use FastAPI ③ OpenAPI
How to use Python argparse
How to use IPython Notebook
How to use Pandas Rolling
[Note] How to use virtualenv