[PYTHON] Verwenden Sie ein in PySpark geschultes Scikit-Lernmodell

Was du machen willst

Maschinelles Lernen in Spark ist MLlib, aber es scheint, dass es dem Scicit-Learn in Bezug auf die Funktionalität immer noch unterlegen ist. Zum Beispiel kann Scikit-Learn verwendet werden, um zu korrigieren, wenn die Anzahl der positiven und negativen Beispiele während des Trainings ungleichmäßig ist, aber mllib 1.5 hat noch keine solche Funktion [^ Korrektur der Klassenbezeichnung]. In einem solchen Fall denke ich, dass die Vorteile von sklearn und spark genutzt werden können, wenn ein Lernender im Voraus mit scicit-learn mit genügend Daten erstellt wird, um in den Speicher zu passen, und es kann zur Vorhersage großer Datenmengen mit pyspark verwendet werden. Ich bin.

Let's Try

Politik

Sie können die Daten in RDD von ndarray konvertieren und an die Vorhersage des von RDD map trainierten Modells übergeben. Wenn Sie dies jedoch so tun, wie es ist, scheint der Overhead des Funktionsaufrufs groß zu sein, sodass ich es in Batch-Einheiten einer bestimmten Größe verarbeiten möchte. Ich werde.

Vorbereitungen

Bereiten Sie eine Python-Umgebung vor, die "scikit-learn" wie Anaconda im selben Pfad ("/ opt / anaconda" usw.) für alle Funkenknoten verwenden kann. Wenn Sie beim Ausführen des Befehls spark-submit "PYSPARK_PYTHON = / opt / anaconda / bin / python3" angeben, wird dieser Python verwendet.

Implementierungsbeispiel

Erstellen Sie im Voraus ein Lernmodell. Diesmal habe ich Random Forest verwendet. Die Daten sind angemessen.

import numpy as np
from sklearn import ensemble

N = 1000
train_x = np.random.randn(N, 10)
train_y = np.random.binomial(1, 0.1, N)

model = ensemble.RandomForestClassifier(10, class_weight="balanced").fit(train_x, train_y)

Verwenden Sie dies in PySpark wie folgt:

from pyspark import SparkContext
sc = SparkContext()

test_x = np.random.randn(N * 100, 10)
n_partitions = 10
rdd = sc.parallelize(test_x, n_partitions).zipWithIndex()

# Point 1
def batch(xs):
    yield list(xs)

batch_rdd = rdd.mapPartitions(batch)

# Point 2
b_model = sc.broadcast(model)

def split_id_and_data(xs):
    xs = list(xs)
    data = [x[0] for x in xs]
    ids = [x[1] for x in xs]
    return data, ids

# Point 3
result_rdd = batch_rdd.map(split_id_and_data) \
    .flatMap(lambda x: zip(x[1], b_model.value.predict(x[0])))

for _id, pred in result_rdd.take(10):
    print(_id, pred)

sc.stop()

Die Punkte sind die folgenden drei Punkte

  1. Verwenden Sie mapPartitions, um "RDD [ndarray]" in "RDD [list [ndarray]]" zu konvertieren. Auf diese Weise können Sie einige Datenblöcke in model.predict zusammen übergeben.
  2. Senden Sie das trainierte Modell.
  3. Trennen Sie IDs und Daten und übergeben Sie Daten an b_model.value.predict. Wenn Sie dies und die IDs erneut komprimieren und in flatMap einfügen, wird es abgeschlossen

(Nachtrag vom 26.01.2016) In einer Liste ohne Partition zusammenstellen

# Point 1
def batch(xs):
    yield list(xs)

batch_rdd = rdd.mapPartitions(batch)

Ursprünglich wurde für das Teil eine Methode namens "glom" vorbereitet.

batch_rdd = rdd.glom()

(26.01.2016 Nachtrag 2)

DStream verfügt auch über die Methoden glom und flatMap, sodass Sie es für Spark Streaming genauso verwenden können. Es kann gesagt werden, dass mit SVM ein Lerner zur Erkennung von Anomalien erstellt und in Echtzeit auf das Streaming von Daten angewendet wird.

[^ Korrektur der Klassenbezeichnung]: JIRA wurde möglicherweise bald angefordert und implementiert, aber CDH5.5 kann nicht verwendet werden, da der Funke 1,5 beträgt.

Recommended Posts

Verwenden Sie ein in PySpark geschultes Scikit-Lernmodell
Machen Sie Schlussfolgerungen mit dem trainierten Modell des Scicit-Learn in PySpark
Verwenden Sie das von fastText trainierte Modell von Python
Verwenden Sie print in Python2 lambda expression
Summe der Variablen in einem mathematischen Modell
Verwenden Sie Python 3 Subprocess.run () im CGI-Skript
Implementieren Sie ein benutzerdefiniertes Benutzermodell in Django
Verwenden Sie WebDAV in einer Portable Docker-Umgebung
Python scikit-learn Eine Sammlung von Tipps für Vorhersagemodelle, die häufig im Feld verwendet werden
Veröffentlichung eines geschulten Modells von fastText
Erstellen Sie in Python ein einfaches Momentum-Investmentmodell
Verwenden Sie eine benutzerdefinierte Fehlerseite mit Python / Tornado
Python scikit-learn Eine Sammlung von Tipps für Vorhersagemodelle, die häufig im Feld verwendet werden
Verwenden Sie eine kostenlose GPU in Ihrer Lieblingsumgebung
Verwendung von Fixture in Django zur Eingabe von Beispieldaten für das Benutzermodell
Ich erhalte eine java.util.regex.PatternSyntaxException, wenn ich einen String in PySpark teile
Ein Memorandum zur Verwendung von Keras 'keras.preprocessing.image
Praktisch, um Matplotlib-Unterzeichnungen in for-Anweisungen zu verwenden
Verwenden Sie communic (), wenn Sie eine Ausgabe in einem Python-Unterprozess empfangen
Verwenden Sie Sudachipys gelerntes word2vec in einer Umgebung mit wenig Speicher
Verwendung des in Lobe in Python erlernten Modells
Verwendung des japanischen Spacy-Modells mit Google Colaboratory
Verwenden Sie config.ini mit Python
Verwenden Sie DataFrame in Java
Verwenden Sie Datumsangaben in Python
Verwenden Sie Valgrind mit Python
Property Decorator verwenden?
diktieren in diktieren Macht ein Diktat ein Diktat
Modelländerungen in Django
Verwenden Sie ujson in Anfragen
Verwenden Sie den Profiler in Python
Implementierung von VGG16 mit Keras, die ohne Verwendung eines trainierten Modells erstellt wurden
So führen Sie ein geschultes Transformatormodell lokal auf CloudTPU aus
[Django] Verwalten Sie Einstellungen wie das Schreiben in settings.py mit einem Modell
Verwenden Sie MeCab, um schlampige Sätze "langsam" zu übersetzen.
Ein Memorandum, das Sie häufig mit Selen in Python verwenden