[PYTHON] Use a scikit-learn model trained in PySpark

Thing you want to do

Machine learning in Spark is MLlib, but it seems that it is still inferior to scikit-learn in terms of functionality. For example, scikit-learn can be used to correct when the number of positive and negative examples is uneven during learning, but mllib 1.5 does not yet have such a function [^ class label correction]. I think that the advantages of both sklearn and spark can be utilized if a learner is created in advance with scikit-learn with data that can be stored in memory at such times and it can be used for prediction of large-scale data with pyspark. I am.

Let's Try

policy

You can convert the data to RDD of ndarray and pass it to predict of the model trained by RDD map, but if you do it as it is, the overhead of function call seems to be large, so I would like to process it in batch units of a certain size. I will.

Advance preparation

Prepare a Python environment that can use scikit-learn such as Anaconda in the same path (/ opt / anaconda etc.) for all spark nodes. If you specify PYSPARK_PYTHON = / opt / anaconda / bin / python3 when executing the spark-submit command, this Python will be used.

Implementation example

Create a learning model in advance. This time I used Random Forest. The data is appropriate.

import numpy as np
from sklearn import ensemble

N = 1000
train_x = np.random.randn(N, 10)
train_y = np.random.binomial(1, 0.1, N)

model = ensemble.RandomForestClassifier(10, class_weight="balanced").fit(train_x, train_y)

So, use this in PySpark as follows:

from pyspark import SparkContext
sc = SparkContext()

test_x = np.random.randn(N * 100, 10)
n_partitions = 10
rdd = sc.parallelize(test_x, n_partitions).zipWithIndex()

# Point 1
def batch(xs):
    yield list(xs)

batch_rdd = rdd.mapPartitions(batch)

# Point 2
b_model = sc.broadcast(model)

def split_id_and_data(xs):
    xs = list(xs)
    data = [x[0] for x in xs]
    ids = [x[1] for x in xs]
    return data, ids

# Point 3
result_rdd = batch_rdd.map(split_id_and_data) \
    .flatMap(lambda x: zip(x[1], b_model.value.predict(x[0])))

for _id, pred in result_rdd.take(10):
    print(_id, pred)

sc.stop()

The points are the following three points

  1. Use mapPartitions to convert RDD [ndarray] to RDD [list [ndarray]]. By doing this, you can pass some chunks of data together in model.predict.
  2. Broadcast the trained model.
  3. Separate ids and data and pass data to b_model.value.predict. Zip this and ids again and put them in a flatMap and you're done

(2016-01-26 postscript) Put together in a list without partition

# Point 1
def batch(xs):
    yield list(xs)

batch_rdd = rdd.mapPartitions(batch)

Originally, a method called glom was prepared for the part.

batch_rdd = rdd.glom()

(2016-01-26 Addendum 2)

DStream also has glom and flatMap methods, so you can use them in exactly the same way for Spark Streaming. It seems that it can be said that an anomaly detection learner is created with SVM and applied to streaming data in real time.

[^ Class label correction]: JIRA may have been requested and implemented soon, but CDH5.5 cannot be used because spark is 1.5.

Recommended Posts

Use a scikit-learn model trained in PySpark
Make inferences using scikit-learn's trained model in PySpark
Use fastText trained model from Python
Use print in a Python2 lambda expression
Sum of variables in a mathematical model
Use Python3's Subprocess.run () in a CGI script
Implement a Custom User Model in Django
Use WebDAV in a Portable Docker environment
Python scikit-learn A collection of predictive model tips often used in the field
We have released a trained model of fastText
Create a simple momentum investment model in Python
Use a custom error page in python / tornado
Python scikit-learn A collection of predictive model tips often used in the field
Use a free GPU in your favorite environment
How to use fixture in Django to populate sample data associated with a user model
I get a java.util.regex.PatternSyntaxException when splitting a string in PySpark
A memorandum on how to use keras.preprocessing.image in Keras
Convenient to use matplotlib subplots in a for statement
Use communicate () when receiving output in a Python subprocess
Use Sudachipy's learned word2vec in a low memory environment
How to use the model learned in Lobe in Python
How to use Spacy Japanese model in Google Colaboratory
Use config.ini in Python
[Day 9] Creating a model
Use DataFrame in Java
Use dates in Python
Use Valgrind in Python
Use a Property Decorator?
dict in dict Makes a dict a dict
Model changes in Django
Use ujson in requests
Use profiler in Python
Implementation of VGG16 using Keras created without using a trained model
How to run a trained transformer model locally on CloudTPU
[Django] Manage settings like writing in settings.py with a model
Use MeCab to translate sloppy sentences in a "slow" way.
A memorandum that you will often use in Python's Selenium