Spark ML verfügt über einen Mechanismus namens "Pipeline", der kontinuierliche Konvertierungsvorgänge von Datenrahmen in einem kombiniert. Dies erleichtert das Schreiben von Code und verbessert die Effizienz der Speichernutzung in Spark.
Der allgemeine Fluss ist wie folgt
Lassen Sie uns die zuvor veröffentlichte Hauptkomponentenanalyse mit Spark ML mithilfe von Pipeline neu schreiben.
Die Stufe in der Pipeline wird als Stufe bezeichnet. Im Beispiel der Hauptkomponentenanalyse gab es die folgenden drei Stufen.
Deklarieren Sie die Pipeline mit diesen drei. df ist ein Datenrahmen, der Eingabedaten enthält. Weitere Informationen zu df finden Sie im vorherigen Artikel.
from pyspark.ml.pipeline import Pipeline
#Jede Stufe der Pipeline
assembler = VectorAssembler(inputCols=df.columns[1:], outputCol="Variable")
scaler = StandardScaler(inputCol="Variable", outputCol="標準化Variable", withStd=True, withMean=True)
pca = PCA(k=3, inputCol="Standardisierte Variable", outputCol="Hauptkomponentenbewertung")
#Pipeline-Erklärung
pipeline = Pipeline(stages=[
assembler,
scaler,
pca
])
Geben Sie Daten in die erstellte Pipeline ein und erstellen Sie ein Modell.
model = pipeline.fit(df)
result = model.transform(df)
result.select("Hauptkomponentenbewertung").show(truncate=False)
Ich habe die gleichen Ergebnisse erzielt, als ich sie im vorherigen Artikel einzeln ausgeführt habe.
+---------------------------------------------------------------+
|Hauptkomponentenbewertung|
+---------------------------------------------------------------+
|[-2.2620712255691466,0.4021126641946994,0.35861418406317674] |
|[1.3672950172090064,-0.516574975843834,0.8240383763102186] |
|[-0.35784774304549694,1.0654633785914394,-0.7670998522924913] |
|[0.3930334607140129,-1.220525792393691,-0.05437714111925901] |
|[0.9712806670593661,1.7644947192188811,-0.2783291638335238] |
|[0.8556397135650156,-0.9097726336587761,-1.0627843972001996] |
|[1.0076787432724863,0.1504509197015279,1.2009982469039933] |
|[-1.8977055313059759,-0.9270196509736093,-0.005660728153863093]|
|[0.4960234396284956,-0.24274673811341405,-0.6858245266064249] |
|[-0.5733265415277634,0.43411810927677885,0.47042500192836967] |
+---------------------------------------------------------------+
Bühnenobjekte können mit model.stages []
referenziert werden. Wir beziehen uns auf das PCA-Modell der dritten Stufe.
print("====Einzigartiger Vektor====")
print(model.stages[2].pc)
print("====Beitragssatz====")
print(model.stages[2].explainedVariance)
Durch die Verwendung von Pipeline verschwanden die Zwischenvariablen und der Code wurde ordentlich geschrieben. Sie können sich auch auf einzelne Modelle für jede Stufe beziehen, sodass es keinen Grund zu geben scheint, Pipeline nicht zu verwenden.
# -*- coding: utf-8 -*-
from pyspark.sql import SparkSession
from pyspark.ml.pipeline import Pipeline
from pyspark.ml.feature import PCA, VectorAssembler, StandardScaler
# Initialize SparkSession
spark = (SparkSession
.builder
.appName("news")
.enableHiveSupport()
.getOrCreate())
# Read raw data
df = spark.read.csv('news.csv', header=True, inferSchema=True, mode="DROPMALFORMED", encoding='UTF-8')
print("====Rohdaten====")
df.show(truncate=False)
#Rohrleitungsteile vorbereiten
assembler = VectorAssembler(inputCols=df.columns[1:], outputCol="Variable")
scaler = StandardScaler(inputCol="Variable", outputCol="標準化Variable", withStd=True, withMean=True)
pca = PCA(k=3, inputCol="Standardisierte Variable", outputCol="Hauptkomponentenbewertung")
pipeline = Pipeline(stages=[
assembler,
scaler,
pca
])
#Führen Sie die Pipeline aus, um aus den Eingabedaten ein Modell zu erstellen
model = pipeline.fit(df)
#Führen Sie das Modell aus
result = model.transform(df)
result.show(truncate=False)
#Pipeline-Bühne.Kann schrittweise referenziert werden
print("====Einzigartiger Vektor====")
print(model.stages[2].pc)
print("====Beitragssatz====")
print(model.stages[2].explainedVariance)
Recommended Posts