L'apprentissage automatique dans Spark est MLlib, mais il semble qu'il soit toujours inférieur à scicit-learn en termes de fonctionnalités. Par exemple, scikit-learn peut être utilisé pour corriger quand le nombre d'exemples positifs et négatifs est inégal pendant l'entraînement, mais mllib 1.5 n'a pas encore une telle fonction [^ class label correction]. Dans un tel cas, je pense que les avantages de sklearn et spark peuvent être utilisés si un apprenant est créé à l'avance avec scicit-learn avec des données qui peuvent être stockées en mémoire et qu'il peut être utilisé pour la prédiction de données à grande échelle avec pyspark. Je suis.
Let's Try
Vous pouvez convertir les données en RDD de ndarray et les transmettre pour prédire le modèle entraîné par RDD map, mais si vous le faites tel quel, la surcharge de l'appel de fonction semble être importante, je voudrais donc la traiter en unités de lot d'une certaine taille. Je vais.
Préparez un environnement Python qui peut utiliser scikit-learn
comme Anaconda dans le même chemin ( / opt / anaconda
etc.) pour tous les nœuds spark. Si vous spécifiez PYSPARK_PYTHON = / opt / anaconda / bin / python3
lors de l'exécution de la commande spark-submit, ce Python sera utilisé.
Créez un modèle d'apprentissage à l'avance. Cette fois, j'ai utilisé Random Forest. Les données sont appropriées.
import numpy as np
from sklearn import ensemble
N = 1000
train_x = np.random.randn(N, 10)
train_y = np.random.binomial(1, 0.1, N)
model = ensemble.RandomForestClassifier(10, class_weight="balanced").fit(train_x, train_y)
Alors, utilisez ceci dans PySpark comme suit:
from pyspark import SparkContext
sc = SparkContext()
test_x = np.random.randn(N * 100, 10)
n_partitions = 10
rdd = sc.parallelize(test_x, n_partitions).zipWithIndex()
# Point 1
def batch(xs):
yield list(xs)
batch_rdd = rdd.mapPartitions(batch)
# Point 2
b_model = sc.broadcast(model)
def split_id_and_data(xs):
xs = list(xs)
data = [x[0] for x in xs]
ids = [x[1] for x in xs]
return data, ids
# Point 3
result_rdd = batch_rdd.map(split_id_and_data) \
.flatMap(lambda x: zip(x[1], b_model.value.predict(x[0])))
for _id, pred in result_rdd.take(10):
print(_id, pred)
sc.stop()
Les points sont les trois points suivants
RDD [ndarray]
en RDD [list [ndarray]]
. En faisant cela, vous pouvez transmettre des morceaux de données ensemble dans model.predict
.b_model.value.predict
. Si vous zippez à nouveau ceci et les identifiants et le mettez dans flatMap, il sera terminé(PostScript 2016-01-26) Mettre ensemble dans une liste sans partition
# Point 1
def batch(xs):
yield list(xs)
batch_rdd = rdd.mapPartitions(batch)
À l'origine, une méthode appelée «glom» a été préparée pour la pièce.
batch_rdd = rdd.glom()
(2016-01-26 Addendum 2)
DStream a également des méthodes glom
et flatMap
, vous pouvez donc l'utiliser exactement de la même manière pour Spark Streaming. Il semble que l'on puisse dire qu'un apprenant en détection d'anomalies est créé avec SVM et appliqué au streaming de données en temps réel.
[^ Correction de l'étiquette de classe]: JIRA a peut-être été demandé et implémenté bientôt, mais CDH5.5 ne peut pas être utilisé car spark est 1.5.