Hello everyone. @best_not_best. This time, I will introduce the technology according to the work I am in charge of.
Collaborative filtering is used to calculate the score at which a user purchases a product. Since the amount of calculation is large and it takes time to process large-scale data, PySpark performs distributed processing.
Add the following to spark-env.sh so that Python 3.x can be called on the Spark side.
export PYSPARK_PYTHON=python3
PYSPARK_DRIVER_PYTHON=python3
Acquire training data from a database, etc., and create a CSV file with 3 columns of "user", "item", and "rating". For example, when using "product purchase data for the past month", the data will be "user ID", "product ID", and "whether or not the product was purchased (not purchased: 0 / purchased: 1)". Below is an example of a CSV file.
user | item | rating |
---|---|---|
1xxxxxxxx2 | 3xxxxxxxx5 | 1 |
1xxxxxxxx9 | 3xxxxxxxx5 | 1 |
1xxxxxxxx8 | 3xxxxxxxx3 | 0 |
As the name of rating
," how much the user gave the product a rating "is the original usage, but as mentioned above," whether the product was purchased "or" accessed the page ". It is also possible to implement with data such as "whether or not". In the former case, it is a model that predicts "how much the user will buy the product", and in the latter case, "how much the user will visit the page".
Since the user ID and product ID can only handle up to the maximum value of ʻint32 (2,147,483,647), if there is an ID that exceeds that, the ID will be numbered again. Also, since only integer values can be handled, renumbering is performed in the same way even if a character string is included. If the ID is an integer value and does not exceed the maximum value of ʻint32
, skip this step.
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
"""processing training data."""
from datetime import datetime
from pyspark.sql.dataframe import DataFrame
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
class ProcessTrainingData(object):
"""get training data from Redshift, and add sequence number to data."""
def __get_action_log(
self,
sqlContext: SQLContext,
unprocessed_data_file_path: str
) -> DataFrame:
"""get data."""
df = sqlContext\
.read\
.format('csv')\
.options(header='true')\
.load(unprocessed_data_file_path)
return df
def run(
self,
unprocessed_data_file_path: str,
training_data_dir_path: str
) -> bool:
"""execute."""
# make spark context
spark = SparkSession\
.builder\
.appName('process_training_data')\
.config('spark.sql.crossJoin.enabled', 'true')\
.config('spark.debug.maxToStringFields', 500)\
.getOrCreate()
sqlContext = SQLContext(sparkContext=spark.sparkContext, sparkSession=spark)
# get data
df = self.__get_action_log(sqlContext, unprocessed_data_file_path)
# make sequence number of users
unique_users_rdd = df.rdd.map(lambda l: l[0]).distinct().zipWithIndex()
unique_users_df = sqlContext.createDataFrame(
unique_users_rdd,
('user', 'unique_user_id')
)
# make sequence number of items
unique_items_rdd = df.rdd.map(lambda l: l[1]).distinct().zipWithIndex()
unique_items_df = sqlContext.createDataFrame(
unique_items_rdd,
('item', 'unique_item_id')
)
# add sequence number of users, sequence number of items to data
df = df.join(
unique_users_df,
df['user'] == unique_users_df['user'],
'inner'
).drop(unique_users_df['user'])
df = df.join(
unique_items_df,
df['item'] == unique_items_df['item'],
'inner'
).drop(unique_items_df['item'])
# save
saved_data_file_path = training_data_dir_path + 'cf_training_data.csv'
df.write\
.format('csv')\
.mode('overwrite')\
.options(header='true')\
.save(saved_data_file_path)
return True
Read the CSV of the training data, add the user ID and product ID columns renumbered with zipWithIndex ()
, and save it as a separate file.
Execute as follows.
ptd = ProcessTrainingData()
ptd.run(unprocessed_data_file_path, training_data_dir_path)
The parameters are as follows.
As mentioned above, set ʻunprocessed_data_file_path` to a CSV file with the following columns.
user | item | rating |
---|---|---|
1xxxxxxxx2 | 3xxxxxxxx5 | 1 |
1xxxxxxxx9 | 3xxxxxxxx5 | 1 |
1xxxxxxxx8 | 3xxxxxxxx3 | 0 |
When executed, a CSV file with the following columns will be output to training_data_dir_path
.
user | item | rating | unique_user_id | unique_item_id |
---|---|---|---|---|
1xxxxxxxx7 | 3xxxxxxxx3 | 1 | 57704 | 32419 |
1xxxxxxxx8 | 3xxxxxxxx3 | 0 | 115460 | 32419 |
1xxxxxxxx6 | 3xxxxxxxx3 | 1 | 48853 | 32419 |
Create and save a model of collaborative filtering.
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
"""create collaborative filtering model."""
from datetime import datetime
from pyspark.ml.recommendation import ALS
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql.types import FloatType
from pyspark.sql.types import IntegerType
from pyspark.sql.types import StringType
from pyspark.sql.types import StructField
from pyspark.sql.types import StructType
class CreateCfModel(object):
"""create collaborative filtering model."""
def run(
self,
processed_training_data_file_path: str,
model_dir_path: str,
rank: int,
max_iter: int,
implicit_prefs: str,
alpha: float,
num_user_blocks: int,
num_item_blocks: int
) -> bool:
"""execute."""
# make spark context
spark = SparkSession\
.builder\
.appName('create_cf_model')\
.config('spark.sql.crossJoin.enabled', 'true')\
.config('spark.debug.maxToStringFields', 500)\
.getOrCreate()
sqlContext = SQLContext(sparkContext=spark.sparkContext, sparkSession=spark)
# create model
als = ALS(
rank=int(rank),
maxIter=int(max_iter),
implicitPrefs=bool(implicit_prefs),
alpha=float(alpha),
numUserBlocks=int(num_user_blocks),
numItemBlocks=int(num_item_blocks),
userCol='unique_user_id',
itemCol='unique_item_id'
)
# load training data
custom_schema = StructType([
StructField('user', StringType(), True),
StructField('item', StringType(), True),
StructField('rating', FloatType(), True),
StructField('unique_user_id', IntegerType(), True),
StructField('unique_item_id', IntegerType(), True),
])
df = sqlContext\
.read\
.format('csv')\
.options(header='true')\
.load(processed_training_data_file_path, schema=custom_schema)
# fitting
model = als.fit(df)
# save
saved_data_dir_path = model_dir_path + 'als_model'
model.write().overwrite().save(saved_data_dir_path)
return True
If you did not need to renumber in the previous section, match the # create model
and # load training data
to the column names in the CSV file, and modify as follows.
# create model
als = ALS(
rank=int(rank),
maxIter=int(max_iter),
implicitPrefs=bool(implicit_prefs),
alpha=float(alpha),
numUserBlocks=int(num_user_blocks),
numItemBlocks=int(num_item_blocks),
userCol='user',
itemCol='item'
)
# load training data
custom_schema = StructType([
StructField('user', IntegerType(), True),
StructField('item', IntegerType(), True),
StructField('rating', FloatType(), True),
])
Execute as follows.
ccm = CreateCfModel()
ccm.run(
processed_training_data_file_path,
model_dir_path,
rank,
max_iter,
implicit_prefs,
alpha,
num_user_blocks,
num_item_blocks
)
The parameters are as follows. rank
, max_iter
, ʻimplicit_prefs, ʻalpha
, num_user_blocks
, num_item_blocks
are [PySpark ALS parameters](http://spark.apache.org/docs/latest/api/python/ It will be pyspark.ml.html#pyspark.ml.recommendation.ALS).
When numbered in the previous section, set the CSV file with the following columns in processed_training_data_file_path
.
user | item | rating | unique_user_id | unique_item_id |
---|---|---|---|---|
1xxxxxxxx7 | 3xxxxxxxx3 | 1 | 57704 | 32419 |
1xxxxxxxx8 | 3xxxxxxxx3 | 0 | 115460 | 32419 |
1xxxxxxxx6 | 3xxxxxxxx3 | 1 | 48853 | 32419 |
If not numbered, set a CSV file with the following columns.
user | item | rating |
---|---|---|
1xxxxxxxx2 | 3xxxxxxxx5 | 1 |
1xxxxxxxx9 | 3xxxxxxxx5 | 1 |
1xxxxxxxx8 | 3xxxxxxxx3 | 0 |
Load the saved model and predict the score from the combination of user ID and product ID. Prepare list data for each user ID and product ID separately from the training data. As for the prediction result, "all results" and "results of the top N scores for each user" are saved as CSV files. This time, the user ID and product ID that do not exist in the original learning data (= the action log does not exist) are not predicted.
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
"""predict score from collaborative filtering model."""
from datetime import datetime
from pyspark.ml.recommendation import ALSModel
from pyspark.sql import Row
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql.types import FloatType
from pyspark.sql.types import IntegerType
from pyspark.sql.types import StringType
from pyspark.sql.types import StructField
from pyspark.sql.types import StructType
class CreatePredictedScore(object):
"""predict score from collaborative filtering model."""
def run(
self,
model_file_path: str,
predict_data_dir_path: str,
user_data_file_path: str,
item_data_file_path: str,
processed_training_data_file_path: str,
data_limit: int=-1
) -> bool:
"""execute."""
# make spark context
spark = SparkSession\
.builder\
.appName('create_predicted_score')\
.config('spark.sql.crossJoin.enabled', 'true')\
.config('spark.debug.maxToStringFields', 500)\
.getOrCreate()
sqlContext = SQLContext(sparkContext=spark.sparkContext, sparkSession=spark)
# load user data
users_df = sqlContext\
.read\
.format('csv')\
.options(header='false')\
.load(user_data_file_path)
users_id_rdd = users_df.rdd.map(lambda l: Row(user_id=l[0]))
users_id_df = sqlContext.createDataFrame(users_id_rdd)
# load item data
items_df = sqlContext\
.read\
.format('csv')\
.options(header='false')\
.load(item_data_file_path)
items_id_rdd = items_df.rdd.map(lambda l: Row(item_id=l[0]))
items_id_df = sqlContext.createDataFrame(items_id_rdd)
# cross join user_id and item_id
joined_df = users_id_df.join(items_id_df)
joined_df.cache()
# delete unnecessary variables
del(users_df)
del(users_id_rdd)
del(users_id_df)
del(items_df)
del(items_id_rdd)
del(items_id_df)
# load training data
custom_schema = StructType([
StructField('user', StringType(), True),
StructField('item', StringType(), True),
StructField('rating', FloatType(), True),
StructField('unique_user_id', IntegerType(), True),
StructField('unique_item_id', IntegerType(), True),
])
training_df = sqlContext\
.read\
.format('csv')\
.options(header='true')\
.load(processed_training_data_file_path, schema=custom_schema)
# users
unique_users_rdd = training_df.rdd.map(lambda l: [l[0], l[3]])
unique_users_df = sqlContext.createDataFrame(
unique_users_rdd,
('user', 'unique_user_id')
).dropDuplicates()
unique_users_df.cache()
# items
unique_items_rdd = training_df.rdd.map(lambda l: [l[1], l[4]])
unique_items_df = sqlContext.createDataFrame(
unique_items_rdd,
('item', 'unique_item_id')
).dropDuplicates()
unique_items_df.cache()
# delete unnecessary variables
del(training_df)
del(unique_users_rdd)
del(unique_items_rdd)
# add unique user id
joined_df = joined_df.join(
unique_users_df,
joined_df['user_id'] == unique_users_df['user'],
'inner'
).drop(unique_users_df['user'])
# add unique item id
joined_df = joined_df.join(
unique_items_df,
joined_df['item_id'] == unique_items_df['item'],
'inner'
).drop(unique_items_df['item'])
# load model
model = ALSModel.load(model_file_path)
# predict score
predictions = model.transform(joined_df)
all_predict_data = predictions\
.select('user_id', 'item_id', 'prediction')\
.filter('prediction > 0')
# save
# all score
saved_data_file_path = predict_data_dir_path + 'als_predict_data_all.csv'
all_predict_data.write\
.format('csv')\
.mode('overwrite')\
.options(header='true')\
.save(saved_data_file_path)
# limited score
data_limit = int(data_limit)
if data_limit > 0:
all_predict_data.registerTempTable('predictions')
sql = 'SELECT user_id, item_id, prediction ' \
+ 'FROM ( ' \
+ ' SELECT user_id, item_id, prediction, dense_rank() ' \
+ ' OVER (PARTITION BY user_id ORDER BY prediction DESC) AS rank ' \
+ ' FROM predictions ' \
+ ') tmp WHERE rank <= %d' % (data_limit)
limited_predict_data = sqlContext.sql(sql)
saved_data_file_path = predict_data_dir_path + 'als_predict_data_limit.csv'
limited_predict_data.write\
.format('csv')\
.mode('overwrite')\
.options(header='true')\
.save(saved_data_file_path)
return True
The combination of prediction targets is created by the following procedure.
If you didn't need to renumber in the first section, modify the run
method as follows:
def run(
self,
model_file_path: str,
predict_data_dir_path: str,
processed_training_data_file_path: str,
data_limit: int=-1
) -> bool:
"""execute."""
# make spark context
spark = SparkSession\
.builder\
.appName('create_predicted_score')\
.config('spark.sql.crossJoin.enabled', 'true')\
.config('spark.debug.maxToStringFields', 500)\
.getOrCreate()
sqlContext = SQLContext(sparkContext=spark.sparkContext, sparkSession=spark)
# load training data
custom_schema = StructType([
StructField('user', IntegerType(), True),
StructField('item', IntegerType(), True),
StructField('rating', FloatType(), True),
])
training_df = sqlContext\
.read\
.format('csv')\
.options(header='true')\
.load(processed_training_data_file_path, schema=custom_schema)
# load user data
users_id_rdd = training_df.rdd.map(lambda l: Row(user_id=l[0]))
users_id_df = sqlContext.createDataFrame(users_id_rdd)
# load item data
items_id_rdd = training_df.rdd.map(lambda l: Row(item_id=l[1]))
items_id_df = sqlContext.createDataFrame(items_id_rdd)
# cross join user_id and item_id
joined_df = users_id_df.join(items_id_df)
joined_df.cache()
# delete unnecessary variables
del(training_df)
del(users_id_rdd)
del(users_id_df)
del(items_id_rdd)
del(items_id_df)
# load model
model = ALSModel.load(model_file_path)
(same as below)
Execute as follows.
cps = CreatePredictedScore()
cps.run(
model_file_path,
predict_data_dir_path,
user_data_file_path,
item_data_file_path,
processed_training_data_file_path,
data_limit
)
The parameters are as follows. In data_limit
, N of the top N scores is specified. If 0 or less is specified, the top N data will not be created.
ʻUser_data_file_path is set to the CSV file with the user ID in the first column. I'm using a file without a header. In ʻitem_data_file_path
, set the CSV file with the product ID in the first column. Similarly, I am using a file without a header.
For processed_training_data_file_path
, set a CSV file with the following columns.
user | item | rating | unique_user_id | unique_item_id |
---|---|---|---|---|
1xxxxxxxx7 | 3xxxxxxxx3 | 1 | 57704 | 32419 |
1xxxxxxxx8 | 3xxxxxxxx3 | 0 | 115460 | 32419 |
1xxxxxxxx6 | 3xxxxxxxx3 | 1 | 48853 | 32419 |
A CSV file with the following columns is output to predict_data_dir_path
.
user_id | user_id | prediction |
---|---|---|
1xxxxxxxx3 | 3xxxxxxxx4 | 0.15594198 |
1xxxxxxxx3 | 3xxxxxxxx0 | 0.19135818 |
1xxxxxxxx3 | 3xxxxxxxx8 | 0.048197098 |
prediction
is the predicted value.
Implemented collaborative filtering with ALS in PySpark. I'm sorry it's hard to read because I haven't divided the methods. Next time, I will explain how to evaluate the model.
Recommended Posts