Hello everyone. @best_not_best. This time, I will introduce the technology according to the work I am in charge of.
Collaborative filtering is used to calculate the score at which a user purchases a product. Since the amount of calculation is large and it takes time to process large-scale data, PySpark performs distributed processing.
Add the following to spark-env.sh so that Python 3.x can be called on the Spark side.
export PYSPARK_PYTHON=python3
PYSPARK_DRIVER_PYTHON=python3
Acquire training data from a database, etc., and create a CSV file with 3 columns of "user", "item", and "rating". For example, when using "product purchase data for the past month", the data will be "user ID", "product ID", and "whether or not the product was purchased (not purchased: 0 / purchased: 1)". Below is an example of a CSV file.
| user | item | rating | 
|---|---|---|
| 1xxxxxxxx2 | 3xxxxxxxx5 | 1 | 
| 1xxxxxxxx9 | 3xxxxxxxx5 | 1 | 
| 1xxxxxxxx8 | 3xxxxxxxx3 | 0 | 
As the name of rating," how much the user gave the product a rating "is the original usage, but as mentioned above," whether the product was purchased "or" accessed the page ". It is also possible to implement with data such as "whether or not". In the former case, it is a model that predicts "how much the user will buy the product", and in the latter case, "how much the user will visit the page".
Since the user ID and product ID can only handle up to the maximum value of ʻint32 (2,147,483,647), if there is an ID that exceeds that, the ID will be numbered again. Also, since only integer values can be handled, renumbering is performed in the same way even if a character string is included.  If the ID is an integer value and does not exceed the maximum value of ʻint32, skip this step.
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
"""processing training data."""
from datetime import datetime
from pyspark.sql.dataframe import DataFrame
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
class ProcessTrainingData(object):
    """get training data from Redshift, and add sequence number to data."""
    def __get_action_log(
        self,
        sqlContext: SQLContext,
        unprocessed_data_file_path: str
    ) -> DataFrame:
        """get data."""
        df = sqlContext\
            .read\
            .format('csv')\
            .options(header='true')\
            .load(unprocessed_data_file_path)
        return df
    def run(
        self,
        unprocessed_data_file_path: str,
        training_data_dir_path: str
    ) -> bool:
        """execute."""
        # make spark context
        spark = SparkSession\
            .builder\
            .appName('process_training_data')\
            .config('spark.sql.crossJoin.enabled', 'true')\
            .config('spark.debug.maxToStringFields', 500)\
            .getOrCreate()
        sqlContext = SQLContext(sparkContext=spark.sparkContext, sparkSession=spark)
        # get data
        df = self.__get_action_log(sqlContext, unprocessed_data_file_path)
        # make sequence number of users
        unique_users_rdd = df.rdd.map(lambda l: l[0]).distinct().zipWithIndex()
        unique_users_df = sqlContext.createDataFrame(
            unique_users_rdd,
            ('user', 'unique_user_id')
        )
        # make sequence number of items
        unique_items_rdd = df.rdd.map(lambda l: l[1]).distinct().zipWithIndex()
        unique_items_df = sqlContext.createDataFrame(
            unique_items_rdd,
            ('item', 'unique_item_id')
        )
        # add sequence number of users, sequence number of items to data
        df = df.join(
            unique_users_df,
            df['user'] == unique_users_df['user'],
            'inner'
        ).drop(unique_users_df['user'])
        df = df.join(
            unique_items_df,
            df['item'] == unique_items_df['item'],
            'inner'
        ).drop(unique_items_df['item'])
        # save
        saved_data_file_path = training_data_dir_path + 'cf_training_data.csv'
        df.write\
            .format('csv')\
            .mode('overwrite')\
            .options(header='true')\
            .save(saved_data_file_path)
        return True
Read the CSV of the training data, add the user ID and product ID columns renumbered with zipWithIndex (), and save it as a separate file.
Execute as follows.
ptd = ProcessTrainingData()
ptd.run(unprocessed_data_file_path, training_data_dir_path)
The parameters are as follows.
As mentioned above, set ʻunprocessed_data_file_path` to a CSV file with the following columns.
| user | item | rating | 
|---|---|---|
| 1xxxxxxxx2 | 3xxxxxxxx5 | 1 | 
| 1xxxxxxxx9 | 3xxxxxxxx5 | 1 | 
| 1xxxxxxxx8 | 3xxxxxxxx3 | 0 | 
When executed, a CSV file with the following columns will be output to training_data_dir_path.
| user | item | rating | unique_user_id | unique_item_id | 
|---|---|---|---|---|
| 1xxxxxxxx7 | 3xxxxxxxx3 | 1 | 57704 | 32419 | 
| 1xxxxxxxx8 | 3xxxxxxxx3 | 0 | 115460 | 32419 | 
| 1xxxxxxxx6 | 3xxxxxxxx3 | 1 | 48853 | 32419 | 
Create and save a model of collaborative filtering.
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
"""create collaborative filtering model."""
from datetime import datetime
from pyspark.ml.recommendation import ALS
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql.types import FloatType
from pyspark.sql.types import IntegerType
from pyspark.sql.types import StringType
from pyspark.sql.types import StructField
from pyspark.sql.types import StructType
class CreateCfModel(object):
    """create collaborative filtering model."""
    def run(
        self,
        processed_training_data_file_path: str,
        model_dir_path: str,
        rank: int,
        max_iter: int,
        implicit_prefs: str,
        alpha: float,
        num_user_blocks: int,
        num_item_blocks: int
    ) -> bool:
        """execute."""
        # make spark context
        spark = SparkSession\
            .builder\
            .appName('create_cf_model')\
            .config('spark.sql.crossJoin.enabled', 'true')\
            .config('spark.debug.maxToStringFields', 500)\
            .getOrCreate()
        sqlContext = SQLContext(sparkContext=spark.sparkContext, sparkSession=spark)
        # create model
        als = ALS(
            rank=int(rank),
            maxIter=int(max_iter),
            implicitPrefs=bool(implicit_prefs),
            alpha=float(alpha),
            numUserBlocks=int(num_user_blocks),
            numItemBlocks=int(num_item_blocks),
            userCol='unique_user_id',
            itemCol='unique_item_id'
        )
        # load training data
        custom_schema = StructType([
            StructField('user', StringType(), True),
            StructField('item', StringType(), True),
            StructField('rating', FloatType(), True),
            StructField('unique_user_id', IntegerType(), True),
            StructField('unique_item_id', IntegerType(), True),
        ])
        df = sqlContext\
            .read\
            .format('csv')\
            .options(header='true')\
            .load(processed_training_data_file_path, schema=custom_schema)
        # fitting
        model = als.fit(df)
        # save
        saved_data_dir_path = model_dir_path + 'als_model'
        model.write().overwrite().save(saved_data_dir_path)
        return True
If you did not need to renumber in the previous section, match the # create model and # load training data to the column names in the CSV file, and modify as follows.
# create model
als = ALS(
    rank=int(rank),
    maxIter=int(max_iter),
    implicitPrefs=bool(implicit_prefs),
    alpha=float(alpha),
    numUserBlocks=int(num_user_blocks),
    numItemBlocks=int(num_item_blocks),
    userCol='user',
    itemCol='item'
)
# load training data
custom_schema = StructType([
    StructField('user', IntegerType(), True),
    StructField('item', IntegerType(), True),
    StructField('rating', FloatType(), True),
])
Execute as follows.
ccm = CreateCfModel()
ccm.run(
    processed_training_data_file_path,
    model_dir_path,
    rank,
    max_iter,
    implicit_prefs,
    alpha,
    num_user_blocks,
    num_item_blocks
)
The parameters are as follows. rank, max_iter, ʻimplicit_prefs, ʻalpha, num_user_blocks, num_item_blocks are [PySpark ALS parameters](http://spark.apache.org/docs/latest/api/python/ It will be pyspark.ml.html#pyspark.ml.recommendation.ALS).
When numbered in the previous section, set the CSV file with the following columns in processed_training_data_file_path.
| user | item | rating | unique_user_id | unique_item_id | 
|---|---|---|---|---|
| 1xxxxxxxx7 | 3xxxxxxxx3 | 1 | 57704 | 32419 | 
| 1xxxxxxxx8 | 3xxxxxxxx3 | 0 | 115460 | 32419 | 
| 1xxxxxxxx6 | 3xxxxxxxx3 | 1 | 48853 | 32419 | 
If not numbered, set a CSV file with the following columns.
| user | item | rating | 
|---|---|---|
| 1xxxxxxxx2 | 3xxxxxxxx5 | 1 | 
| 1xxxxxxxx9 | 3xxxxxxxx5 | 1 | 
| 1xxxxxxxx8 | 3xxxxxxxx3 | 0 | 
Load the saved model and predict the score from the combination of user ID and product ID. Prepare list data for each user ID and product ID separately from the training data. As for the prediction result, "all results" and "results of the top N scores for each user" are saved as CSV files. This time, the user ID and product ID that do not exist in the original learning data (= the action log does not exist) are not predicted.
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
"""predict score from collaborative filtering model."""
from datetime import datetime
from pyspark.ml.recommendation import ALSModel
from pyspark.sql import Row
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql.types import FloatType
from pyspark.sql.types import IntegerType
from pyspark.sql.types import StringType
from pyspark.sql.types import StructField
from pyspark.sql.types import StructType
class CreatePredictedScore(object):
    """predict score from collaborative filtering model."""
    def run(
        self,
        model_file_path: str,
        predict_data_dir_path: str,
        user_data_file_path: str,
        item_data_file_path: str,
        processed_training_data_file_path: str,
        data_limit: int=-1
    ) -> bool:
        """execute."""
        # make spark context
        spark = SparkSession\
            .builder\
            .appName('create_predicted_score')\
            .config('spark.sql.crossJoin.enabled', 'true')\
            .config('spark.debug.maxToStringFields', 500)\
            .getOrCreate()
        sqlContext = SQLContext(sparkContext=spark.sparkContext, sparkSession=spark)
        # load user data
        users_df = sqlContext\
            .read\
            .format('csv')\
            .options(header='false')\
            .load(user_data_file_path)
        users_id_rdd = users_df.rdd.map(lambda l: Row(user_id=l[0]))
        users_id_df = sqlContext.createDataFrame(users_id_rdd)
        # load item data
        items_df = sqlContext\
            .read\
            .format('csv')\
            .options(header='false')\
            .load(item_data_file_path)
        items_id_rdd = items_df.rdd.map(lambda l: Row(item_id=l[0]))
        items_id_df = sqlContext.createDataFrame(items_id_rdd)
        # cross join user_id and item_id
        joined_df = users_id_df.join(items_id_df)
        joined_df.cache()
        # delete unnecessary variables
        del(users_df)
        del(users_id_rdd)
        del(users_id_df)
        del(items_df)
        del(items_id_rdd)
        del(items_id_df)
        # load training data
        custom_schema = StructType([
            StructField('user', StringType(), True),
            StructField('item', StringType(), True),
            StructField('rating', FloatType(), True),
            StructField('unique_user_id', IntegerType(), True),
            StructField('unique_item_id', IntegerType(), True),
        ])
        training_df = sqlContext\
            .read\
            .format('csv')\
            .options(header='true')\
            .load(processed_training_data_file_path, schema=custom_schema)
        # users
        unique_users_rdd = training_df.rdd.map(lambda l: [l[0], l[3]])
        unique_users_df = sqlContext.createDataFrame(
            unique_users_rdd,
            ('user', 'unique_user_id')
        ).dropDuplicates()
        unique_users_df.cache()
        # items
        unique_items_rdd = training_df.rdd.map(lambda l: [l[1], l[4]])
        unique_items_df = sqlContext.createDataFrame(
            unique_items_rdd,
            ('item', 'unique_item_id')
        ).dropDuplicates()
        unique_items_df.cache()
        # delete unnecessary variables
        del(training_df)
        del(unique_users_rdd)
        del(unique_items_rdd)
        # add unique user id
        joined_df = joined_df.join(
            unique_users_df,
            joined_df['user_id'] == unique_users_df['user'],
            'inner'
        ).drop(unique_users_df['user'])
        # add unique item id
        joined_df = joined_df.join(
            unique_items_df,
            joined_df['item_id'] == unique_items_df['item'],
            'inner'
        ).drop(unique_items_df['item'])
        # load model
        model = ALSModel.load(model_file_path)
        # predict score
        predictions = model.transform(joined_df)
        all_predict_data = predictions\
            .select('user_id', 'item_id', 'prediction')\
            .filter('prediction > 0')
        # save
        # all score
        saved_data_file_path = predict_data_dir_path + 'als_predict_data_all.csv'
        all_predict_data.write\
            .format('csv')\
            .mode('overwrite')\
            .options(header='true')\
            .save(saved_data_file_path)
        # limited score
        data_limit = int(data_limit)
        if data_limit > 0:
            all_predict_data.registerTempTable('predictions')
            sql = 'SELECT user_id, item_id, prediction ' \
                + 'FROM ( ' \
                + '  SELECT user_id, item_id, prediction, dense_rank() ' \
                + '  OVER (PARTITION BY user_id ORDER BY prediction DESC) AS rank ' \
                + '  FROM predictions ' \
                + ') tmp WHERE rank <= %d' % (data_limit)
            limited_predict_data = sqlContext.sql(sql)
            saved_data_file_path = predict_data_dir_path + 'als_predict_data_limit.csv'
            limited_predict_data.write\
                .format('csv')\
                .mode('overwrite')\
                .options(header='true')\
                .save(saved_data_file_path)
        return True
The combination of prediction targets is created by the following procedure.
If you didn't need to renumber in the first section, modify the run method as follows:
def run(
    self,
    model_file_path: str,
    predict_data_dir_path: str,
    processed_training_data_file_path: str,
    data_limit: int=-1
) -> bool:
    """execute."""
    # make spark context
    spark = SparkSession\
        .builder\
        .appName('create_predicted_score')\
        .config('spark.sql.crossJoin.enabled', 'true')\
        .config('spark.debug.maxToStringFields', 500)\
        .getOrCreate()
    sqlContext = SQLContext(sparkContext=spark.sparkContext, sparkSession=spark)
    # load training data
    custom_schema = StructType([
        StructField('user', IntegerType(), True),
        StructField('item', IntegerType(), True),
        StructField('rating', FloatType(), True),
    ])
    training_df = sqlContext\
        .read\
        .format('csv')\
        .options(header='true')\
        .load(processed_training_data_file_path, schema=custom_schema)
    # load user data
    users_id_rdd = training_df.rdd.map(lambda l: Row(user_id=l[0]))
    users_id_df = sqlContext.createDataFrame(users_id_rdd)
    # load item data
    items_id_rdd = training_df.rdd.map(lambda l: Row(item_id=l[1]))
    items_id_df = sqlContext.createDataFrame(items_id_rdd)
    # cross join user_id and item_id
    joined_df = users_id_df.join(items_id_df)
    joined_df.cache()
    # delete unnecessary variables
    del(training_df)
    del(users_id_rdd)
    del(users_id_df)
    del(items_id_rdd)
    del(items_id_df)
    # load model
    model = ALSModel.load(model_file_path)
(same as below)
Execute as follows.
cps = CreatePredictedScore()
cps.run(
    model_file_path,
    predict_data_dir_path,
    user_data_file_path,
    item_data_file_path,
    processed_training_data_file_path,
    data_limit
)
The parameters are as follows. In data_limit, N of the top N scores is specified. If 0 or less is specified, the top N data will not be created.
ʻUser_data_file_path is set to the CSV file with the user ID in the first column.  I'm using a file without a header.  In ʻitem_data_file_path, set the CSV file with the product ID in the first column. Similarly, I am using a file without a header.
For processed_training_data_file_path, set a CSV file with the following columns.
| user | item | rating | unique_user_id | unique_item_id | 
|---|---|---|---|---|
| 1xxxxxxxx7 | 3xxxxxxxx3 | 1 | 57704 | 32419 | 
| 1xxxxxxxx8 | 3xxxxxxxx3 | 0 | 115460 | 32419 | 
| 1xxxxxxxx6 | 3xxxxxxxx3 | 1 | 48853 | 32419 | 
A CSV file with the following columns is output to predict_data_dir_path.
| user_id | user_id | prediction | 
|---|---|---|
| 1xxxxxxxx3 | 3xxxxxxxx4 | 0.15594198 | 
| 1xxxxxxxx3 | 3xxxxxxxx0 | 0.19135818 | 
| 1xxxxxxxx3 | 3xxxxxxxx8 | 0.048197098 | 
prediction is the predicted value.
Implemented collaborative filtering with ALS in PySpark. I'm sorry it's hard to read because I haven't divided the methods. Next time, I will explain how to evaluate the model.
Recommended Posts