[PYTHON] Comment utiliser le pipeline Spark ML

Spark ML a un mécanisme appelé «Pipeline» qui combine les opérations de conversion continue des trames de données en une seule. Son utilisation facilitera l'écriture du code et améliorera l'efficacité de l'utilisation de la mémoire dans Spark.

Le flux général est le suivant

Réécrivons l '[Analyse des composants principaux avec Spark ML] précédemment publiée (http://qiita.com/knoguchi/items/7714a1cc9be56588b72a) en utilisant Pipeline.

Scène et pipeline

L'étape dans le pipeline s'appelle l'étape. Dans l'exemple de l'analyse en composantes principales, il y avait les trois étapes suivantes.

Déclarez le pipeline en utilisant ces trois éléments. df est une trame de données contenant des données d'entrée. Voir l'article précédent pour plus de détails sur df.

from pyspark.ml.pipeline import Pipeline

#Chaque étape du pipeline
assembler = VectorAssembler(inputCols=df.columns[1:], outputCol="variable")
scaler = StandardScaler(inputCol="variable", outputCol="標準化variable", withStd=True, withMean=True)
pca = PCA(k=3, inputCol="Variable normalisée", outputCol="Score du composant principal")

#Déclaration de pipeline
pipeline = Pipeline(stages=[
    assembler,
    scaler,
    pca
])

Génération de modèle

Entrez les données dans le pipeline construit et créez un modèle.

model = pipeline.fit(df)

Exécution du modèle

result = model.transform(df)
result.select("Score du composant principal").show(truncate=False)

Résultat d'exécution

J'ai obtenu les mêmes résultats que lorsque je les ai exécutés individuellement dans l'article précédent.

+---------------------------------------------------------------+
|Score du composant principal|
+---------------------------------------------------------------+
|[-2.2620712255691466,0.4021126641946994,0.35861418406317674]   |
|[1.3672950172090064,-0.516574975843834,0.8240383763102186]     |
|[-0.35784774304549694,1.0654633785914394,-0.7670998522924913]  |
|[0.3930334607140129,-1.220525792393691,-0.05437714111925901]   |
|[0.9712806670593661,1.7644947192188811,-0.2783291638335238]    |
|[0.8556397135650156,-0.9097726336587761,-1.0627843972001996]   |
|[1.0076787432724863,0.1504509197015279,1.2009982469039933]     |
|[-1.8977055313059759,-0.9270196509736093,-0.005660728153863093]|
|[0.4960234396284956,-0.24274673811341405,-0.6858245266064249]  |
|[-0.5733265415277634,0.43411810927677885,0.47042500192836967]  |
+---------------------------------------------------------------+

Comment se référer à la scène

Les objets de scène peuvent être référencés avec model.stages []. Faisons référence au modèle PCA de la troisième étape.

print("====Vecteur unique====")
print(model.stages[2].pc)

print("====Taux de cotisation====")
print(model.stages[2].explainedVariance)

Résumé

En utilisant Pipeline, les variables intermédiaires ont disparu et le code a été écrit proprement. Vous pouvez également faire référence à des modèles individuels pour chaque étape, il ne semble donc y avoir aucune raison de ne pas utiliser Pipeline.

Toutes les sources

# -*- coding: utf-8 -*-
from pyspark.sql import SparkSession
from pyspark.ml.pipeline import Pipeline
from pyspark.ml.feature import PCA, VectorAssembler, StandardScaler

# Initialize SparkSession
spark = (SparkSession
         .builder
         .appName("news")
         .enableHiveSupport()
         .getOrCreate())

# Read raw data
df = spark.read.csv('news.csv', header=True, inferSchema=True, mode="DROPMALFORMED", encoding='UTF-8')

print("====Données brutes====")
df.show(truncate=False)

#Préparer les pièces du pipeline
assembler = VectorAssembler(inputCols=df.columns[1:], outputCol="variable")
scaler = StandardScaler(inputCol="variable", outputCol="標準化variable", withStd=True, withMean=True)
pca = PCA(k=3, inputCol="Variable normalisée", outputCol="Score du composant principal")

pipeline = Pipeline(stages=[
    assembler,
    scaler,
    pca
])

#Exécutez le pipeline pour créer un modèle à partir des données d'entrée
model = pipeline.fit(df)

#Exécutez le modèle
result = model.transform(df)
result.show(truncate=False)

#Étape du pipeline.Peut être référencé par étapes
print("====Vecteur unique====")
print(model.stages[2].pc)

print("====Taux de cotisation====")
print(model.stages[2].explainedVariance)

Recommended Posts

Comment utiliser le pipeline Spark ML
Comment utiliser le générateur
Comment utiliser le décorateur
Comment utiliser la fonction zip
Comment utiliser le module optparse
Comment utiliser le module ConfigParser
[Linux] Comment utiliser la commande echo
Comment utiliser le débogueur IPython (ipdb)
Comment utiliser xml.etree.ElementTree
Comment utiliser Python-shell
Remarques sur l'utilisation de tf.data
Comment utiliser virtualenv
Comment utiliser Seaboan
Comment utiliser la correspondance d'image
Comment utiliser le shogun
Comment utiliser Pandas 2
Comment utiliser Virtualenv
Comment utiliser numpy.vectorize
Comment utiliser pytest_report_header
Comment utiliser partiel
Comment utiliser Bio.Phylo
Comment utiliser SymPy
Comment utiliser x-means
Comment utiliser WikiExtractor.py
Comment utiliser IPython
Comment utiliser virtualenv
Comment utiliser Matplotlib
Comment utiliser iptables
Comment utiliser numpy
Comment utiliser TokyoTechFes2015
Comment utiliser venv
Comment utiliser le dictionnaire {}
Comment utiliser Pyenv
Comment utiliser la liste []
Comment utiliser python-kabusapi
Comment utiliser OptParse
Comment utiliser le retour
Comment utiliser pyenv-virtualenv
Comment utiliser imutils
Comment utiliser la bibliothèque C en Python
Comment utiliser MkDocs pour la première fois
Comment utiliser la bibliothèque de dessins graphiques Bokeh
Comment utiliser l'API Google Cloud Translation
Comment utiliser l'API du guide des programmes NHK
[Algorithm x Python] Comment utiliser la liste
Comment utiliser Qt Designer
Comment utiliser la recherche triée
[gensim] Comment utiliser Doc2Vec
python3: Comment utiliser la bouteille (2)
Comprendre comment utiliser django-filter
[Python] Comment utiliser la liste 1
Comment utiliser Raspeye Relay Module Python
Comment utiliser FastAPI ③ OpenAPI
Comment utiliser Python Argparse
Comment utiliser IPython Notebook
Comment utiliser Pandas Rolling
[Note] Comment utiliser virtualenv