Spark ML a un mécanisme appelé «Pipeline» qui combine les opérations de conversion continue des trames de données en une seule. Son utilisation facilitera l'écriture du code et améliorera l'efficacité de l'utilisation de la mémoire dans Spark.
Le flux général est le suivant
Réécrivons l '[Analyse des composants principaux avec Spark ML] précédemment publiée (http://qiita.com/knoguchi/items/7714a1cc9be56588b72a) en utilisant Pipeline.
L'étape dans le pipeline s'appelle l'étape. Dans l'exemple de l'analyse en composantes principales, il y avait les trois étapes suivantes.
Déclarez le pipeline en utilisant ces trois éléments. df est une trame de données contenant des données d'entrée. Voir l'article précédent pour plus de détails sur df.
from pyspark.ml.pipeline import Pipeline
#Chaque étape du pipeline
assembler = VectorAssembler(inputCols=df.columns[1:], outputCol="variable")
scaler = StandardScaler(inputCol="variable", outputCol="標準化variable", withStd=True, withMean=True)
pca = PCA(k=3, inputCol="Variable normalisée", outputCol="Score du composant principal")
#Déclaration de pipeline
pipeline = Pipeline(stages=[
assembler,
scaler,
pca
])
Entrez les données dans le pipeline construit et créez un modèle.
model = pipeline.fit(df)
result = model.transform(df)
result.select("Score du composant principal").show(truncate=False)
J'ai obtenu les mêmes résultats que lorsque je les ai exécutés individuellement dans l'article précédent.
+---------------------------------------------------------------+
|Score du composant principal|
+---------------------------------------------------------------+
|[-2.2620712255691466,0.4021126641946994,0.35861418406317674] |
|[1.3672950172090064,-0.516574975843834,0.8240383763102186] |
|[-0.35784774304549694,1.0654633785914394,-0.7670998522924913] |
|[0.3930334607140129,-1.220525792393691,-0.05437714111925901] |
|[0.9712806670593661,1.7644947192188811,-0.2783291638335238] |
|[0.8556397135650156,-0.9097726336587761,-1.0627843972001996] |
|[1.0076787432724863,0.1504509197015279,1.2009982469039933] |
|[-1.8977055313059759,-0.9270196509736093,-0.005660728153863093]|
|[0.4960234396284956,-0.24274673811341405,-0.6858245266064249] |
|[-0.5733265415277634,0.43411810927677885,0.47042500192836967] |
+---------------------------------------------------------------+
Les objets de scène peuvent être référencés avec model.stages []
. Faisons référence au modèle PCA de la troisième étape.
print("====Vecteur unique====")
print(model.stages[2].pc)
print("====Taux de cotisation====")
print(model.stages[2].explainedVariance)
En utilisant Pipeline, les variables intermédiaires ont disparu et le code a été écrit proprement. Vous pouvez également faire référence à des modèles individuels pour chaque étape, il ne semble donc y avoir aucune raison de ne pas utiliser Pipeline.
# -*- coding: utf-8 -*-
from pyspark.sql import SparkSession
from pyspark.ml.pipeline import Pipeline
from pyspark.ml.feature import PCA, VectorAssembler, StandardScaler
# Initialize SparkSession
spark = (SparkSession
.builder
.appName("news")
.enableHiveSupport()
.getOrCreate())
# Read raw data
df = spark.read.csv('news.csv', header=True, inferSchema=True, mode="DROPMALFORMED", encoding='UTF-8')
print("====Données brutes====")
df.show(truncate=False)
#Préparer les pièces du pipeline
assembler = VectorAssembler(inputCols=df.columns[1:], outputCol="variable")
scaler = StandardScaler(inputCol="variable", outputCol="標準化variable", withStd=True, withMean=True)
pca = PCA(k=3, inputCol="Variable normalisée", outputCol="Score du composant principal")
pipeline = Pipeline(stages=[
assembler,
scaler,
pca
])
#Exécutez le pipeline pour créer un modèle à partir des données d'entrée
model = pipeline.fit(df)
#Exécutez le modèle
result = model.transform(df)
result.show(truncate=False)
#Étape du pipeline.Peut être référencé par étapes
print("====Vecteur unique====")
print(model.stages[2].pc)
print("====Taux de cotisation====")
print(model.stages[2].explainedVariance)
Recommended Posts