Die MLlib von Spark wird auf ML migriert. Ab Spark 2.0 wird die RDD-basierte MLlib-API nur noch gewartet, und die DataFrame-basierte API wird in Zukunft der Standard sein. Hier verwenden wir die ML-API in PySpark, um eine Hauptkomponentenanalyse durchzuführen.
Lassen Sie uns in Spark ein Beispiel implementieren, das die Trends des Papiers aus der Bewertung der Erfüllung von Nachrichten, Unternehmen und Sport der 10 unten verlinkten Zeitungen analysiert.
http://ifs.nog.cc/gucchi24.hp.infoseek.co.jp/SHUSEIEX.htm
In Bezug auf 10 Zeitungen wurde der Inhalt des Artikels auf einer 10-Punkte-Skala für Nachrichten, Unternehmen und Sport untersucht. Die Skala ist sehr gut von 0 bis 10, aber nicht gut, aber 0.
news.csv
no,Nachrichten,Unternehmen,Sport
1,8,9,4
2,2,5,7
3,8,5,6
4,3,5,4
5,7,4,9
6,4,3,4
7,3,6,8
8,6,8,2
9,5,4,5
10,6,7,6
Früher war es Standard, mit "SparkContext" zu beginnen, aber in 2.0 werden wir "ParkSession" verwenden. SQLContext
, HiveContext
wurden in SparkSession
integriert.
from pyspark.sql import SparkSession
spark = (SparkSession
.builder
.appName("news")
.enableHiveSupport()
.getOrCreate())
Lesen Sie die CSV mit "spark.read.csv" und speichern Sie sie im DataFrame.
ex1.py
df = spark.read.csv(filename, header=True, inferSchema=True, mode="DROPMALFORMED", encoding='UTF-8')
print("====Rohdaten====")
df.show(truncate=False)
Wenn Japanisch enthalten ist, wird die Tabelle reduziert. Die Zeichenbreite wird nicht berücksichtigt.
$ export PYTHONIOENCODING=utf8
$ spark-submit ex1.py
====Rohdaten====
+---+----+----+----+
|no |Nachrichten|Unternehmen|Sport|
+---+----+----+----+
|1 |8 |9 |4 |
|2 |2 |5 |7 |
|3 |8 |5 |6 |
|4 |3 |5 |4 |
|5 |7 |4 |9 |
|6 |4 |3 |4 |
|7 |3 |6 |8 |
|8 |6 |8 |2 |
|9 |5 |4 |5 |
|10 |6 |7 |6 |
+---+----+----+----+
PCA () erfordert eine Variation in Vektorform. Verwenden Sie VectorAssembler
, um [News, Business, Sports] in einen Vektor zu konvertieren und in der Spalte Variant
zu speichern. .transform (df)
erstellt einen neuen DataFrame.
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=df.columns[1:], outputCol="Variable")
feature_vectors = assembler.transform(df)
feature_vectors.show(truncate=False)
Der Spalte "Variante" wurde ein Vektor hinzugefügt.
+---+----+----+----+-------------+
| no|Nachrichten|Unternehmen|Sport|Variable|
+---+----+----+----+-------------+
| 1| 8| 9| 4|[8.0,9.0,4.0]|
| 2| 2| 5| 7|[2.0,5.0,7.0]|
| 3| 8| 5| 6|[8.0,5.0,6.0]|
| 4| 3| 5| 4|[3.0,5.0,4.0]|
| 5| 7| 4| 9|[7.0,4.0,9.0]|
| 6| 4| 3| 4|[4.0,3.0,4.0]|
| 7| 3| 6| 8|[3.0,6.0,8.0]|
| 8| 6| 8| 2|[6.0,8.0,2.0]|
| 9| 5| 4| 5|[5.0,4.0,5.0]|
| 10| 6| 7| 6|[6.0,7.0,6.0]|
+---+----+----+----+-------------+
Nach dem Verbindungsziel werden die Daten vor der Berechnung standardisiert. Bei der Hauptkomponentenanalyse ist es im Allgemeinen besser, vor der Berechnung zu standardisieren. In ML gibt es einen "Standard Scaler". Verwenden Sie diesen.
Die Eingabe ist ein Vektor der Spalte "Variante" und die Ausgabe ist die "standardisierte Variable". Diese API erstellt zuerst ein Modell aus den Eingabedaten mit ".fit", gibt dann die Eingabedaten erneut mit ".transform" aus und transformiert sie tatsächlich.
from pyspark.ml.feature import StandardScaler
# step1
scaler = StandardScaler(inputCol="Variable", outputCol="標準化Variable", withStd=True, withMean=True)
scalerModel = scaler.fit(feature_vectors)
# step2
std_feature_vectors = scalerModel.transform(feature_vectors)
#Nur standardisierte Variablen anzeigen
print("====Standardisierte Daten====")
std_feature_vectors.select("Standardisierte Variable").show(truncate=False)
Es unterscheidet sich geringfügig von der verknüpften Tabelle. StandardScaler
verwendet eine unverzerrte Verteilung (n-1), aber das Verbindungsziel verwendet eine Beispielverteilung (n). Weitere Informationen finden Sie in anderen Artikeln.
====Standardisierte Daten====
+---------------------------------------------------------------+
|Standardisierte Variable|
+---------------------------------------------------------------+
|[1.3023647131866891,1.7919573407620815,-0.7071067811865476] |
|[-1.4884168150705013,-0.3162277660168382,0.7071067811865476] |
|[1.3023647131866891,-0.3162277660168382,0.23570226039551587] |
|[-1.0232865603609695,-0.3162277660168382,-0.7071067811865476] |
|[0.8372344584771575,-0.8432740427115681,1.649915822768611] |
|[-0.5581563056514377,-1.370320319406298,-0.7071067811865476] |
|[-1.0232865603609695,0.21081851067789167,1.1785113019775793] |
|[0.3721042037676257,1.2649110640673515,-1.649915822768611] |
|[-0.09302605094190601,-0.8432740427115681,-0.23570226039551587]|
|[0.3721042037676257,0.7378647873726216,0.23570226039551587] |
+---------------------------------------------------------------+
PCA
Schließlich können Sie die Hauptkomponentenanalyse-API aufrufen. Die Eingabe ist eine "standardisierte Variable" und die Ausgabe ist eine "Hauptkomponentenbewertung". Erstellen Sie wie beim obigen "Standard Scaler" zuerst ein Modell und führen Sie dann die eigentlichen Berechnungen durch. Der Eigenvektor und der Beitragssatz können aus dem konstruierten Modell erhalten werden. k = 3 ist eine Anweisung zur Berechnung bis zur dritten Hauptkomponente. Ursprünglich wird k auf einen großen Wert gesetzt und einmal berechnet, dann wird k so ausgewählt, dass die kumulierte Summe aus dem höchsten Beitragssatz etwa 80% beträgt, und erneut berechnet.
from pyspark.ml.feature import PCA
pca = PCA(k=3, inputCol="Standardisierte Variable", outputCol="Hauptkomponentenbewertung")
pcaModel = pca.fit(std_feature_vectors)
print("====Einzigartiger Vektor====")
print(pcaModel.pc)
print("====Beitragssatz====")
print(pcaModel.explainedVariance)
pca_score = pcaModel.transform(std_feature_vectors).select("Hauptkomponentenbewertung")
print("====Hauptkomponentenbewertung====")
pca_score.show(truncate=False)
Was das Ergebnis des Eigenvektors betrifft, so ist die erste Spalte (die am weitesten links liegende vertikale Spalte) der Eigenvektor der ersten Hauptkomponente, die zweite Spalte die zweite Hauptkomponente und die dritte Spalte die dritte Hauptkomponente.
Der Beitragssatz betrug 52% für die erste Hauptkomponente, 30% für die zweite Hauptkomponente und 17,6% für die dritte Hauptkomponente. Da die kumulierte Summe der ersten und zweiten 82% beträgt, kann die dritte Hauptkomponente entfernt werden. In diesem Fall setze k = 2.
Der Eigenwert wird ** nicht erhalten **, aber der Beitragssatz ist "die Summe der Eigenwerte / Eigenwerte", so dass in den meisten Fällen der Beitragssatz ausreichend sein sollte.
Die Hauptkomponentenbewertungen jeder Zeitung sind die erste Hauptkomponente in der ersten Spalte, die zweite Hauptkomponente in der zweiten Spalte und die dritte Hauptkomponente in der dritten Spalte. Der Code der Bewertung der ersten Hauptkomponente steht verkehrt herum als Linkziel. Nur weil der Vektor um 180 Grad gegenüberliegend ausgerichtet ist, hat dies keinen Einfluss auf das Analyseergebnis. Die Werte unterscheiden sich geringfügig aufgrund des Unterschieds in der unverzerrten Verteilung und der Stichprobenverteilung.
====Einzigartiger Vektor====
DenseMatrix([[-0.53130806, 0.68925233, -0.49258803],
[-0.67331251, 0.00933405, 0.73929908],
[ 0.51416145, 0.72446125, 0.45912296]])
====Beitragssatz====
[0.52355344314,0.300887148322,0.175559408538]
====Hauptkomponentenbewertung====
+---------------------------------------------------------------+
|Hauptkomponentenbewertung|
+---------------------------------------------------------------+
|[-2.2620712255691466,0.4021126641946994,0.35861418406317674] |
|[1.3672950172090064,-0.516574975843834,0.8240383763102186] |
|[-0.35784774304549694,1.0654633785914394,-0.7670998522924913] |
|[0.3930334607140129,-1.220525792393691,-0.05437714111925901] |
|[0.9712806670593661,1.7644947192188811,-0.2783291638335238] |
|[0.8556397135650156,-0.9097726336587761,-1.0627843972001996] |
|[1.0076787432724863,0.1504509197015279,1.2009982469039933] |
|[-1.8977055313059759,-0.9270196509736093,-0.005660728153863093]|
|[0.4960234396284956,-0.24274673811341405,-0.6858245266064249] |
|[-0.5733265415277634,0.43411810927677885,0.47042500192836967] |
+---------------------------------------------------------------+
Wir haben ein Beispiel für die Hauptkomponentenanalyse mit DataFrame, PCA und Standard Scaler unter den ML-APIs von Spark vorgestellt.
Eine Analyse der Zeitungstrends finden Sie im verlinkten Artikel (^^;
pca.py
# -*- coding: utf-8 -*-
from pyspark.sql import SparkSession
from pyspark.ml.feature import PCA, VectorAssembler, StandardScaler
# Initialize SparkSession
spark = (SparkSession
.builder
.appName("news")
.enableHiveSupport()
.getOrCreate())
# Read raw data
df = spark.read.csv('news.csv', header=True, inferSchema=True, mode="DROPMALFORMED", encoding='UTF-8')
print("====Rohdaten====")
df.show(truncate=False)
assembler = VectorAssembler(inputCols=df.columns[1:], outputCol="Variable")
feature_vectors = assembler.transform(df)
feature_vectors.show()
scaler = StandardScaler(inputCol="Variable", outputCol="標準化Variable", withStd=True, withMean=True)
scalerModel = scaler.fit(feature_vectors)
std_feature_vectors = scalerModel.transform(feature_vectors)
print("====Standardisierte Daten====")
std_feature_vectors.select("Standardisierte Variable").show(truncate=False)
# build PCA model
pca = PCA(k=3, inputCol="Standardisierte Variable", outputCol="Hauptkomponentenbewertung")
pcaModel = pca.fit(std_feature_vectors)
print("====Einzigartiger Vektor====")
print(pcaModel.pc)
print("====Beitragssatz====")
print(pcaModel.explainedVariance)
pca_score = pcaModel.transform(std_feature_vectors).select("Hauptkomponentenbewertung")
print("====Hauptkomponentenbewertung====")
pca_score.show(truncate=False)
Recommended Posts