Dies ist ein Test zum Ausführen von Spark auf iPython Notebook (Jupyter) und zum Ausführen von MLlib. Ich habe versucht, Clustering (KMeans), Klassifizierung: Klassifizierung (SVM, logistische Regression, Random Forest) mit Irisdaten.
Umgebung
Bitte beachten Sie, dass dieser Artikel beschreibt, was in der oben genannten Umgebung getan wurde, sodass die Einstellungen in anderen Umgebungen abweichen können.
http://spark.apache.org/downloads.html Von spark-1.5.0-bin-hadoop2.6.tgz Herunterladen. (Stand 14. September 2015)
Beantworten Sie die heruntergeladene Binärdatei und platzieren Sie sie an der entsprechenden Stelle. Hier wird es in / usr / local / bin /
platziert.
tar zxvf spark-1.5.0-bin-hadoop2.6.tar
mv spark-1.5.0-bin-hadoop2.6 /usr/local/bin/
Setzen Sie die Umgebungsvariable SPARK_HOME
auf .bashrc. Bitte fügen Sie Folgendes hinzu.
(Laden Sie zum ersten Mal mit source ~ / .bashrc
neu und lesen Sie die Umgebungsvariablen.)
.bashrc
export SPARK_HOME=/usr/local/bin/spark-1.5.0-bin-hadoop2.6
Starten Sie iPython Notebook, wenn Sie bereit sind.
ipython notebook
Führen Sie den folgenden Code in iPython Notebook aus.
import os, sys
from datetime import datetime as dt
print "loading PySpark setting..."
spark_home = os.environ.get('SPARK_HOME', None)
if not spark_home:
raise ValueError('SPARK_HOME environment variable is not set')
sys.path.insert(0, os.path.join(spark_home, 'python'))
sys.path.insert(0, os.path.join(spark_home, 'python/lib/py4j-0.8.2.1-src.zip'))
execfile(os.path.join(spark_home, 'python/pyspark/shell.py'))
Wenn Sie so etwas sehen, sind Sie erfolgreich: Lachen:
loading PySpark setting...
/usr/local/bin/spark-1.5.0-bin-hadoop2.6
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 1.5.0
/_/
Using Python version 2.7.10 (default, May 28 2015 17:04:42)
SparkContext available as sc, HiveContext available as sqlContext.
Verwenden Sie den bekannten Iris-Datensatz. Ich werde dies verwenden, weil es einfacher zu bekommen ist als Scikit-lernen. Probieren Sie vorerst die Kombination aus ('Kelchblattlänge', 'Blütenblattlänge'). Wie auch immer, zeichnen wir das Streudiagramm.
%matplotlib inline
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from sklearn import datasets
plt.style.use('ggplot')
# http://scikit-learn.org/stable/auto_examples/datasets/plot_iris_dataset.html
iris = datasets.load_iris()
plt.figure(figsize=(9,7))
for i, color in enumerate('rgb'):
idx = np.where(iris.target == i)[0]
plt.scatter(iris.data[idx,0],iris.data[idx,2], c=color, s=30, alpha=.7)
plt.show()
Drei Arten von Iris-Setosa, Iris-Versicolour und Iris-Virginica werden in verschiedenen Farben angezeigt.
Aus diesen Daten werden wir KMeans ausprobieren, eine der Clustering-Methoden für unbeaufsichtigtes Lernen. Da es ursprünglich drei Arten von Iris gibt, setzen wir k = 3 und prüfen, ob es richtig beurteilt werden kann.
# http://spark.apache.org/docs/latest/api/python/pyspark.mllib.html#module-pyspark.mllib.clustering
from pyspark.mllib.clustering import KMeans
k = 3
d_start = dt.now()
#Konvertieren Sie Daten, damit Spark lesen kann
data = sc.parallelize(iris.data[:,[0,2]])
#Lernen Sie mit KMeans
model = KMeans.train(data, k, initializationMode="random", seed=None)
#Ergebnisanzeige
print("Final centers: " + str(model.clusterCenters))
print("Total Cost: " + str(model.computeCost(data)))
diff = dt.now() - d_start
print("{}: [end] {}".format(dt.now().strftime('%H:%M:%S'), diff ))
# ---------- Draw Graph ---------- #
plt.figure(figsize=(9,7))
for i, color in enumerate('rgb'):
idx = np.where(iris.target == i)[0]
plt.scatter(iris.data[idx,0],iris.data[idx,2], c=color, s=30, alpha=.7)
for i in range(k):
plt.scatter(model.clusterCenters[i][0], model.clusterCenters[i][1], s=200, c="purple", alpha=.7, marker="^")
plt.show()
Irgendwie kann das mit K Means berechnete Zentrum nahe der Mitte jedes Typs aufgetragen werden: zwinker: (Das Ergebnis von KMeans hängt vom Anfangswert ab, daher kann es seltsamer sein.)
Es gibt nur wenige Teile, die Spark verwenden. Wenn Sie jedoch die verteilte Verarbeitung richtig einstellen, wird die Verarbeitung verteilt. (Diesmal handelt es sich um eine Testversion mit Standalone, daher wird sie nicht verteilt.)
Der Punkt ist, dass Numpys Ndarray von sc.parallelize () in Spark's RDD konvertiert und an den Trainingsfunktionszug () der Spark-KMeans-Klasse übergeben wird.
data = sc.parallelize(iris.data[:,[0,2]])
model = KMeans.train(data, k, initializationMode="random", seed=None)
Verwenden Sie model.clusterCenters
, um die Mittelpunkte für jeden Cluster in k Gruppen abzurufen. Im folgenden Code erhalten wir k = 3 Zentren und zeichnen die Markierung ▲.
for i in range(k):
plt.scatter(model.clusterCenters[i][0], model.clusterCenters[i][1], s=200, c="purple", alpha=.7, marker="^")
Lassen Sie uns abschließend visualisieren, wie jeder Punkt der Koordinaten gruppiert wurde.
# ------- Create Color Map ------- #
xmin = 4.0
xmax = 8.5
ymin = 0
ymax = 8
n = 100
xx = np.linspace(xmin, xmax, n)
yy = np.linspace(ymin, ymax, n)
X, Y = np.meshgrid(xx, yy)
# 2015.9.15 Ergänzung: Verteilte Verarbeitung von Vorhersagen
f_XY = np.column_stack([X.flatten(), Y.flatten()])
sc_XY = sc.parallelize(f_XY)
res = sc_XY.map(lambda data: model.predict(data)) #Vorhersageausführung, welcher Punkt aus den gelernten Daten klassifiziert wird
Z = np.array(res.collect()).reshape(X.shape)
# 2015.9.15 Löschen
#Z = np.zeros_like(X)
#for i in range(n):
# for j in range(n):
# Z[i,j] = model.predict([xx[j],yy[i]])
# ---------- Draw Graph ---------- #
plt.figure(figsize=(9,7))
plt.xlim(xmin, xmax)
plt.ylim(ymin, ymax)
for i, color in enumerate('rgb'):
idx = np.where(iris.target == i)[0]
plt.scatter(iris.data[idx,0],iris.data[idx,2], c=color, s=30, alpha=.7, zorder=100)
for i in range(k):
plt.scatter(model.clusterCenters[i][0], model.clusterCenters[i][1], s=200, c="purple", alpha=.7, marker="^", zorder=100)
plt.pcolor(X, Y, Z, alpha=0.3)
In ähnlicher Weise handelt es sich diesmal um eine Support-Vektor-Maschine. Da dies eine binäre Klassifizierung ist, werden wir die Irisdaten auf zwei Typen eingrenzen.
from pyspark.mllib.classification import SVMWithSGD
from pyspark.mllib.regression import LabeledPoint
#Da SVM eine binäre Klassifikation ist, beschränken Sie sich auf zwei Typen
idx = np.r_[ np.where(iris.target == 1)[0], np.where(iris.target == 2)[0]]
#Konvertieren Sie Daten, damit Spark lesen kann
dat = np.column_stack([iris.target[idx]-1, iris.data[idx,0],iris.data[idx,2]])
data = sc.parallelize(dat)
def parsePoint(vec):
return LabeledPoint(vec[0], vec[1:])
parsedData = data.map(parsePoint)
#Lernen Sie die Ausführung mit SVM
model = SVMWithSGD.train(parsedData, iterations=5000)
# ------- Predict Data ------- #
# 2015.9.15 Ergänzung: Verteilte Verarbeitung von Vorhersagen
f_XY = np.column_stack([X.flatten(), Y.flatten()])
sc_XY = sc.parallelize(f_XY)
res = sc_XY.map(lambda data: model.predict(data)) #Vorhersageausführung, welcher Punkt aus den gelernten Daten klassifiziert wird
Z = np.array(res.collect()).reshape(X.shape)
# 2015.9.15 Löschen
#Z = np.zeros_like(X)
#for i in range(n):
# for j in range(n):
# Z[i,j] = model.predict([xx[j],yy[i]])
# ---------- Draw Graph ---------- #
plt.figure(figsize=(9,7))
xmin = 4.0
xmax = 8.5
ymin = 2
ymax = 8
plt.xlim(xmin, xmax)
plt.ylim(ymin, ymax)
#Punkte zeichnen
for i, color in enumerate('rb'):
idx = np.where(iris.target == i+1)[0]
plt.scatter(iris.data[idx,0],iris.data[idx,2], c=color, s=30, alpha=.7, zorder=100)
#Zeichnung füllen
plt.pcolor(X, Y, Z, alpha=0.3)
Logistische Regression. Dies ist auch eine binäre Klassifikation.
from pyspark.mllib.classification import LogisticRegressionWithLBFGS
#Ausführung mit logistischer Regression lernen
model = LogisticRegressionWithLBFGS.train(parsedData)
# ------- Predict Data ------- #
# 2015.9.15 Ergänzung: Verteilte Verarbeitung von Vorhersagen
f_XY = np.column_stack([X.flatten(), Y.flatten()])
sc_XY = sc.parallelize(f_XY)
res = sc_XY.map(lambda data: model.predict(data)) #Vorhersageausführung, welcher Punkt aus den gelernten Daten klassifiziert wird
Z = np.array(res.collect()).reshape(X.shape)
# 2015.9.15 Löschen
#Z = np.zeros_like(X)
#for i in range(n):
# for j in range(n):
# Z[i,j] = model.predict([xx[j],yy[i]])
# ---------- Draw Graph ---------- #
plt.figure(figsize=(9,7))
plt.xlim(xmin, xmax)
plt.ylim(ymin, ymax)
#Punkte zeichnen
for i, color in enumerate('rb'):
idx = np.where(iris.target == i+1)[0]
plt.scatter(iris.data[idx,0],iris.data[idx,2], c=color, s=30, alpha=.7, zorder=100)
#Zeichnung füllen
plt.pcolor(X, Y, Z, alpha=0.3)
Schließlich gibt es einen zufälligen Wald. Da hier eine mehrwertige Klassifizierung möglich ist, werden wir auch nach 3 Arten von Iris klassifizieren.
from pyspark.mllib.tree import RandomForest, RandomForestModel
#Konvertieren Sie Daten, damit Spark lesen kann
dat = np.column_stack([iris.target[:], iris.data[:,0],iris.data[:,2]])
data = sc.parallelize(dat)
parsedData = data.map(parsePoint)
#Unterteilt in Trainingsdaten und Testdaten
(trainingData, testData) = parsedData.randomSplit([0.7, 0.3])
#Führen Sie das Lernen in einem zufälligen Wald aus
model = RandomForest.trainClassifier(trainingData, numClasses=3,
categoricalFeaturesInfo={},
numTrees=5, featureSubsetStrategy="auto",
impurity='gini', maxDepth=4, maxBins=32)
# Evaluate model on test instances and compute test error
predictions = model.predict(testData.map(lambda x: x.features))
labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)
testErr = labelsAndPredictions.filter(lambda (v, p): v != p).count() / float(testData.count())
print('Test Error = ' + str(testErr))
print('Learned classification forest model:')
print(model.toDebugString())
Die Baumstruktur des Klassifizierungsergebnisses kann auch mit toDebugString ()
angezeigt werden.
out
Test Error = 0.0588235294118
Learned classification forest model:
TreeEnsembleModel classifier with 5 trees
Tree 0:
If (feature 1 <= 1.9)
Predict: 0.0
Else (feature 1 > 1.9)
If (feature 1 <= 4.8)
If (feature 0 <= 4.9)
Predict: 2.0
Else (feature 0 > 4.9)
Predict: 1.0
Else (feature 1 > 4.8)
If (feature 1 <= 5.0)
If (feature 0 <= 6.3)
Predict: 2.0
Else (feature 0 > 6.3)
Predict: 1.0
Else (feature 1 > 5.0)
Predict: 2.0
Tree 1:
If (feature 1 <= 1.9)
Predict: 0.0
Else (feature 1 > 1.9)
If (feature 1 <= 4.7)
If (feature 0 <= 4.9)
Predict: 2.0
Else (feature 0 > 4.9)
Predict: 1.0
Else (feature 1 > 4.7)
If (feature 0 <= 6.5)
Predict: 2.0
Else (feature 0 > 6.5)
If (feature 1 <= 5.0)
Predict: 1.0
Else (feature 1 > 5.0)
Predict: 2.0
Tree 2:
If (feature 1 <= 1.9)
Predict: 0.0
Else (feature 1 > 1.9)
If (feature 1 <= 4.8)
If (feature 1 <= 4.7)
Predict: 1.0
Else (feature 1 > 4.7)
If (feature 0 <= 5.9)
Predict: 1.0
Else (feature 0 > 5.9)
Predict: 2.0
Else (feature 1 > 4.8)
Predict: 2.0
Tree 3:
If (feature 1 <= 1.9)
Predict: 0.0
Else (feature 1 > 1.9)
If (feature 1 <= 4.8)
Predict: 1.0
Else (feature 1 > 4.8)
Predict: 2.0
Tree 4:
If (feature 1 <= 1.9)
Predict: 0.0
Else (feature 1 > 1.9)
If (feature 1 <= 4.7)
If (feature 0 <= 4.9)
Predict: 2.0
Else (feature 0 > 4.9)
Predict: 1.0
Else (feature 1 > 4.7)
If (feature 1 <= 5.0)
If (feature 0 <= 6.0)
Predict: 2.0
Else (feature 0 > 6.0)
Predict: 1.0
Else (feature 1 > 5.0)
Predict: 2.0
Zeichnen Sie das Ergebnis.
# ------- Predict Data ------- #
Z = np.zeros_like(X)
for i in range(n):
for j in range(n):
Z[i,j] = model.predict([xx[j],yy[i]])
# ---------- Draw Graph ---------- #
plt.figure(figsize=(9,7))
xmin = 4.0
xmax = 8.5
ymin = 0
ymax = 8
plt.xlim(xmin, xmax)
plt.ylim(ymin, ymax)
#Punkte zeichnen
for i, color in enumerate('rgb'):
idx = np.where(iris.target == i)[0]
plt.scatter(iris.data[idx,0],iris.data[idx,2], c=color, s=30, alpha=.7, zorder=100)
#Zeichnung füllen
plt.pcolor(X, Y, Z, alpha=0.3)
Als nächstes werden wir uns mit Empfehlungen befassen. "[Maschinelles Lernen] Führen Sie Spark MLlib mit Python aus und geben Sie Empfehlungen ab." http://qiita.com/kenmatsu4/items/42fa2f17865f7914688d
Bei der Parallelisierung von Predict () von RandomForest tritt ein Fehler auf, und wir suchen nach einer Lösung.
-> Anscheinend ist das die Ursache ... "Spark / RDD kann nicht verschachtelt werden!" Ich frage mich, ob es gelöst werden kann ...
How-to: Use IPython Notebook with Apache Spark http://blog.cloudera.com/blog/2014/08/how-to-use-ipython-notebook-with-apache-spark/
Spark 1.5.0 Machine Learning Library (MLlib) Guide http://spark.apache.org/docs/latest/mllib-guide.html
Recommended Posts