Maschinelles Lernen in Spark ist MLlib, aber es scheint, dass es dem Scicit-Learn in Bezug auf die Funktionalität immer noch unterlegen ist. Zum Beispiel kann Scikit-Learn verwendet werden, um zu korrigieren, wenn die Anzahl der positiven und negativen Beispiele während des Trainings ungleichmäßig ist, aber mllib 1.5 hat noch keine solche Funktion [^ Korrektur der Klassenbezeichnung]. In einem solchen Fall denke ich, dass die Vorteile von sklearn und spark genutzt werden können, wenn ein Lernender im Voraus mit scicit-learn mit genügend Daten erstellt wird, um in den Speicher zu passen, und es kann zur Vorhersage großer Datenmengen mit pyspark verwendet werden. Ich bin.
Let's Try
Sie können die Daten in RDD von ndarray konvertieren und an die Vorhersage des von RDD map trainierten Modells übergeben. Wenn Sie dies jedoch so tun, wie es ist, scheint der Overhead des Funktionsaufrufs groß zu sein, sodass ich es in Batch-Einheiten einer bestimmten Größe verarbeiten möchte. Ich werde.
Bereiten Sie eine Python-Umgebung vor, die "scikit-learn" wie Anaconda im selben Pfad ("/ opt / anaconda" usw.) für alle Funkenknoten verwenden kann. Wenn Sie beim Ausführen des Befehls spark-submit "PYSPARK_PYTHON = / opt / anaconda / bin / python3" angeben, wird dieser Python verwendet.
Erstellen Sie im Voraus ein Lernmodell. Diesmal habe ich Random Forest verwendet. Die Daten sind angemessen.
import numpy as np
from sklearn import ensemble
N = 1000
train_x = np.random.randn(N, 10)
train_y = np.random.binomial(1, 0.1, N)
model = ensemble.RandomForestClassifier(10, class_weight="balanced").fit(train_x, train_y)
Verwenden Sie dies in PySpark wie folgt:
from pyspark import SparkContext
sc = SparkContext()
test_x = np.random.randn(N * 100, 10)
n_partitions = 10
rdd = sc.parallelize(test_x, n_partitions).zipWithIndex()
# Point 1
def batch(xs):
yield list(xs)
batch_rdd = rdd.mapPartitions(batch)
# Point 2
b_model = sc.broadcast(model)
def split_id_and_data(xs):
xs = list(xs)
data = [x[0] for x in xs]
ids = [x[1] for x in xs]
return data, ids
# Point 3
result_rdd = batch_rdd.map(split_id_and_data) \
.flatMap(lambda x: zip(x[1], b_model.value.predict(x[0])))
for _id, pred in result_rdd.take(10):
print(_id, pred)
sc.stop()
Die Punkte sind die folgenden drei Punkte
model.predict
zusammen übergeben.b_model.value.predict
. Wenn Sie dies und die IDs erneut komprimieren und in flatMap einfügen, wird es abgeschlossen(Nachtrag vom 26.01.2016) In einer Liste ohne Partition zusammenstellen
# Point 1
def batch(xs):
yield list(xs)
batch_rdd = rdd.mapPartitions(batch)
Ursprünglich wurde für das Teil eine Methode namens "glom" vorbereitet.
batch_rdd = rdd.glom()
(26.01.2016 Nachtrag 2)
DStream verfügt auch über die Methoden glom
und flatMap
, sodass Sie es für Spark Streaming genauso verwenden können. Es kann gesagt werden, dass mit SVM ein Lerner zur Erkennung von Anomalien erstellt und in Echtzeit auf das Streaming von Daten angewendet wird.
[^ Korrektur der Klassenbezeichnung]: JIRA wurde möglicherweise bald angefordert und implementiert, aber CDH5.5 kann nicht verwendet werden, da der Funke 1,5 beträgt.
Recommended Posts