[PYTHON] Utiliser un modèle scikit-learn formé à PySpark

Chose que tu veux faire

L'apprentissage automatique dans Spark est MLlib, mais il semble qu'il soit toujours inférieur à scicit-learn en termes de fonctionnalités. Par exemple, scikit-learn peut être utilisé pour corriger quand le nombre d'exemples positifs et négatifs est inégal pendant l'entraînement, mais mllib 1.5 n'a pas encore une telle fonction [^ class label correction]. Dans un tel cas, je pense que les avantages de sklearn et spark peuvent être utilisés si un apprenant est créé à l'avance avec scicit-learn avec des données qui peuvent être stockées en mémoire et qu'il peut être utilisé pour la prédiction de données à grande échelle avec pyspark. Je suis.

Let's Try

politique

Vous pouvez convertir les données en RDD de ndarray et les transmettre pour prédire le modèle entraîné par RDD map, mais si vous le faites tel quel, la surcharge de l'appel de fonction semble être importante, je voudrais donc la traiter en unités de lot d'une certaine taille. Je vais.

Préparation préalable

Préparez un environnement Python qui peut utiliser scikit-learn comme Anaconda dans le même chemin ( / opt / anaconda etc.) pour tous les nœuds spark. Si vous spécifiez PYSPARK_PYTHON = / opt / anaconda / bin / python3 lors de l'exécution de la commande spark-submit, ce Python sera utilisé.

Exemple d'implémentation

Créez un modèle d'apprentissage à l'avance. Cette fois, j'ai utilisé Random Forest. Les données sont appropriées.

import numpy as np
from sklearn import ensemble

N = 1000
train_x = np.random.randn(N, 10)
train_y = np.random.binomial(1, 0.1, N)

model = ensemble.RandomForestClassifier(10, class_weight="balanced").fit(train_x, train_y)

Alors, utilisez ceci dans PySpark comme suit:

from pyspark import SparkContext
sc = SparkContext()

test_x = np.random.randn(N * 100, 10)
n_partitions = 10
rdd = sc.parallelize(test_x, n_partitions).zipWithIndex()

# Point 1
def batch(xs):
    yield list(xs)

batch_rdd = rdd.mapPartitions(batch)

# Point 2
b_model = sc.broadcast(model)

def split_id_and_data(xs):
    xs = list(xs)
    data = [x[0] for x in xs]
    ids = [x[1] for x in xs]
    return data, ids

# Point 3
result_rdd = batch_rdd.map(split_id_and_data) \
    .flatMap(lambda x: zip(x[1], b_model.value.predict(x[0])))

for _id, pred in result_rdd.take(10):
    print(_id, pred)

sc.stop()

Les points sont les trois points suivants

  1. Utilisez mapPartitions pour convertir RDD [ndarray] en RDD [list [ndarray]]. En faisant cela, vous pouvez transmettre des morceaux de données ensemble dans model.predict.
  2. Diffusez le modèle entraîné.
  3. Séparez les identifiants et les données et transmettez les données à b_model.value.predict. Si vous zippez à nouveau ceci et les identifiants et le mettez dans flatMap, il sera terminé

(PostScript 2016-01-26) Mettre ensemble dans une liste sans partition

# Point 1
def batch(xs):
    yield list(xs)

batch_rdd = rdd.mapPartitions(batch)

À l'origine, une méthode appelée «glom» a été préparée pour la pièce.

batch_rdd = rdd.glom()

(2016-01-26 Addendum 2)

DStream a également des méthodes glom et flatMap, vous pouvez donc l'utiliser exactement de la même manière pour Spark Streaming. Il semble que l'on puisse dire qu'un apprenant en détection d'anomalies est créé avec SVM et appliqué au streaming de données en temps réel.

[^ Correction de l'étiquette de classe]: JIRA a peut-être été demandé et implémenté bientôt, mais CDH5.5 ne peut pas être utilisé car spark est 1.5.

Recommended Posts

Utiliser un modèle scikit-learn formé à PySpark
Faire des inférences à l'aide du modèle entraîné de scicit-learn dans PySpark
Utiliser le modèle entraîné fastText de Python
Utiliser l'impression dans l'expression lambda Python2
Somme des variables dans un modèle mathématique
Utilisez Python 3 Subprocess.run () dans le script CGI
Implémenter un modèle utilisateur personnalisé dans Django
Utiliser WebDAV dans un environnement Docker portable
Python scikit-learn Une collection de conseils de modèles prédictifs souvent utilisés sur le terrain
Publication d'un modèle entraîné de fastText
Créer un modèle d'investissement dynamique simple en Python
Utilisez une page d'erreur personnalisée avec python / tornado
Python scikit-learn Une collection de conseils de modèles prédictifs souvent utilisés sur le terrain
Utilisez un GPU gratuit dans votre environnement préféré
Comment utiliser fixture dans Django pour saisir des exemples de données associés au modèle utilisateur
J'obtiens une exception java.util.regex.PatternSyntaxException lors du fractionnement d'une chaîne dans PySpark
Un mémorandum sur l'utilisation de keras.preprocessing.image de Keras
Pratique pour utiliser les sous-graphiques matplotlib dans l'instruction for
Utilisez communiquer () lors de la réception de la sortie dans un sous-processus Python
Utilisez le mot2vec appris de Sudachipy dans un environnement à faible mémoire
Comment utiliser le modèle appris dans Lobe en Python
Comment utiliser le modèle japonais Spacy avec Google Colaboratory
Utilisez config.ini avec Python
Utiliser DataFrame en Java
Utiliser des dates en Python
Utiliser Valgrind avec Python
Utilisez Property Decorator?
dict in dict Transforme un dict en dict
Modifications du modèle dans Django
Utiliser ujson dans les requêtes
Utiliser le profileur en Python
Implémentation de VGG16 à l'aide de Keras créé sans utiliser de modèle entraîné
Comment exécuter un modèle de transformateur entraîné localement sur CloudTPU
[Django] Gérez les paramètres comme l'écriture dans settings.py avec un modèle
Utilisez MeCab pour traduire des phrases bâclées de manière "lente".
Un mémorandum que vous utiliserez souvent avec Selenium en Python