Hallo zusammen. @best_not_best. Dieses Mal werde ich die Technologie vorstellen, die zu meiner Arbeit passt.
Die Co-Filterung wird verwendet, um die Punktzahl für den Kauf eines Produkts durch einen Benutzer zu berechnen. Da der Rechenaufwand groß ist und die Verarbeitung umfangreicher Daten einige Zeit in Anspruch nimmt, führt PySpark eine verteilte Verarbeitung durch.
Fügen Sie spark-env.sh Folgendes hinzu, damit Sie Python 3.x auf der Spark-Seite aufrufen können.
export PYSPARK_PYTHON=python3
PYSPARK_DRIVER_PYTHON=python3
Erfassen Sie Trainingsdaten aus einer Datenbank usw. und erstellen Sie eine CSV-Datei mit 3 Spalten "Benutzer", "Element" und "Bewertung". Wenn Sie beispielsweise "Produktkaufdaten für den letzten Monat" verwenden, lauten die Daten "Benutzer-ID", "Produkt-ID" und "ob das Produkt gekauft wurde oder nicht (nicht gekauft: 0 / gekauft: 1)". Unten finden Sie ein Beispiel für eine CSV-Datei.
user | item | rating |
---|---|---|
1xxxxxxxx2 | 3xxxxxxxx5 | 1 |
1xxxxxxxx9 | 3xxxxxxxx5 | 1 |
1xxxxxxxx8 | 3xxxxxxxx3 | 0 |
Als Name für "Bewertung" ist "wie viel der Benutzer dem Produkt eine Bewertung gegeben hat" die ursprüngliche Verwendung, aber wie oben erwähnt "ob das Produkt gekauft wurde" oder "auf die Seite zugegriffen". Es ist auch möglich, mit Daten wie "ob oder nicht" zu implementieren. Im ersteren Fall handelt es sich um ein Modell, das vorhersagt, "wie viel der Benutzer das Produkt kauft" und im letzteren Fall, "wie oft der Benutzer die Seite besucht".
Da die Benutzer-ID und die Produkt-ID nur bis zum Maximalwert von "int32" (2.147.483.647) verarbeitet werden können, wird die ID erneut nummeriert, wenn eine ID vorliegt, die diese überschreitet. Da es nur ganzzahlige Werte verarbeiten kann, wird es auf die gleiche Weise neu nummeriert, wenn es eine Zeichenfolge enthält. Wenn die ID ein ganzzahliger Wert ist und den Maximalwert von "int32" nicht überschreitet, überspringen Sie diesen Schritt.
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
"""processing training data."""
from datetime import datetime
from pyspark.sql.dataframe import DataFrame
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
class ProcessTrainingData(object):
"""get training data from Redshift, and add sequence number to data."""
def __get_action_log(
self,
sqlContext: SQLContext,
unprocessed_data_file_path: str
) -> DataFrame:
"""get data."""
df = sqlContext\
.read\
.format('csv')\
.options(header='true')\
.load(unprocessed_data_file_path)
return df
def run(
self,
unprocessed_data_file_path: str,
training_data_dir_path: str
) -> bool:
"""execute."""
# make spark context
spark = SparkSession\
.builder\
.appName('process_training_data')\
.config('spark.sql.crossJoin.enabled', 'true')\
.config('spark.debug.maxToStringFields', 500)\
.getOrCreate()
sqlContext = SQLContext(sparkContext=spark.sparkContext, sparkSession=spark)
# get data
df = self.__get_action_log(sqlContext, unprocessed_data_file_path)
# make sequence number of users
unique_users_rdd = df.rdd.map(lambda l: l[0]).distinct().zipWithIndex()
unique_users_df = sqlContext.createDataFrame(
unique_users_rdd,
('user', 'unique_user_id')
)
# make sequence number of items
unique_items_rdd = df.rdd.map(lambda l: l[1]).distinct().zipWithIndex()
unique_items_df = sqlContext.createDataFrame(
unique_items_rdd,
('item', 'unique_item_id')
)
# add sequence number of users, sequence number of items to data
df = df.join(
unique_users_df,
df['user'] == unique_users_df['user'],
'inner'
).drop(unique_users_df['user'])
df = df.join(
unique_items_df,
df['item'] == unique_items_df['item'],
'inner'
).drop(unique_items_df['item'])
# save
saved_data_file_path = training_data_dir_path + 'cf_training_data.csv'
df.write\
.format('csv')\
.mode('overwrite')\
.options(header='true')\
.save(saved_data_file_path)
return True
Lesen Sie die CSV der Trainingsdaten, fügen Sie die mit "zipWithIndex ()" neu nummerierten Spalten "Benutzer-ID" und "Produkt-ID" hinzu und speichern Sie sie als separate Datei. Führen Sie wie folgt aus.
ptd = ProcessTrainingData()
ptd.run(unprocessed_data_file_path, training_data_dir_path)
Die Parameter sind wie folgt.
Wie oben erwähnt, setzen Sie "unprocessed_data_file_path" auf eine CSV-Datei mit den folgenden Spalten.
user | item | rating |
---|---|---|
1xxxxxxxx2 | 3xxxxxxxx5 | 1 |
1xxxxxxxx9 | 3xxxxxxxx5 | 1 |
1xxxxxxxx8 | 3xxxxxxxx3 | 0 |
Bei der Ausführung wird eine CSV-Datei mit den folgenden Spalten an training_data_dir_path
ausgegeben.
user | item | rating | unique_user_id | unique_item_id |
---|---|---|---|---|
1xxxxxxxx7 | 3xxxxxxxx3 | 1 | 57704 | 32419 |
1xxxxxxxx8 | 3xxxxxxxx3 | 0 | 115460 | 32419 |
1xxxxxxxx6 | 3xxxxxxxx3 | 1 | 48853 | 32419 |
Erstellen und speichern Sie ein Modell für die Co-Filterung.
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
"""create collaborative filtering model."""
from datetime import datetime
from pyspark.ml.recommendation import ALS
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql.types import FloatType
from pyspark.sql.types import IntegerType
from pyspark.sql.types import StringType
from pyspark.sql.types import StructField
from pyspark.sql.types import StructType
class CreateCfModel(object):
"""create collaborative filtering model."""
def run(
self,
processed_training_data_file_path: str,
model_dir_path: str,
rank: int,
max_iter: int,
implicit_prefs: str,
alpha: float,
num_user_blocks: int,
num_item_blocks: int
) -> bool:
"""execute."""
# make spark context
spark = SparkSession\
.builder\
.appName('create_cf_model')\
.config('spark.sql.crossJoin.enabled', 'true')\
.config('spark.debug.maxToStringFields', 500)\
.getOrCreate()
sqlContext = SQLContext(sparkContext=spark.sparkContext, sparkSession=spark)
# create model
als = ALS(
rank=int(rank),
maxIter=int(max_iter),
implicitPrefs=bool(implicit_prefs),
alpha=float(alpha),
numUserBlocks=int(num_user_blocks),
numItemBlocks=int(num_item_blocks),
userCol='unique_user_id',
itemCol='unique_item_id'
)
# load training data
custom_schema = StructType([
StructField('user', StringType(), True),
StructField('item', StringType(), True),
StructField('rating', FloatType(), True),
StructField('unique_user_id', IntegerType(), True),
StructField('unique_item_id', IntegerType(), True),
])
df = sqlContext\
.read\
.format('csv')\
.options(header='true')\
.load(processed_training_data_file_path, schema=custom_schema)
# fitting
model = als.fit(df)
# save
saved_data_dir_path = model_dir_path + 'als_model'
model.write().overwrite().save(saved_data_dir_path)
return True
Wenn Sie im vorherigen Abschnitt nicht neu nummerieren mussten, ordnen Sie die Spaltennamen in der CSV-Datei den Spaltennamen "Modell erstellen" und "Trainingsdaten laden" zu und ändern Sie sie wie folgt.
# create model
als = ALS(
rank=int(rank),
maxIter=int(max_iter),
implicitPrefs=bool(implicit_prefs),
alpha=float(alpha),
numUserBlocks=int(num_user_blocks),
numItemBlocks=int(num_item_blocks),
userCol='user',
itemCol='item'
)
# load training data
custom_schema = StructType([
StructField('user', IntegerType(), True),
StructField('item', IntegerType(), True),
StructField('rating', FloatType(), True),
])
Führen Sie wie folgt aus.
ccm = CreateCfModel()
ccm.run(
processed_training_data_file_path,
model_dir_path,
rank,
max_iter,
implicit_prefs,
alpha,
num_user_blocks,
num_item_blocks
)
Die Parameter sind wie folgt. rank
, max_iter
, implicit_prefs
, alpha
, num_user_blocks
, num_item_blocks
sind [PySpark ALS-Parameter](http://spark.apache.org/docs/latest/api/python/ Es wird pyspark.ml.html # pyspark.ml.recommendation.ALS sein.
Wenn im vorherigen Abschnitt nummeriert, legen Sie die CSV-Datei mit den folgenden Spalten in "verarbeiteter_training_data_file_path" fest.
user | item | rating | unique_user_id | unique_item_id |
---|---|---|---|---|
1xxxxxxxx7 | 3xxxxxxxx3 | 1 | 57704 | 32419 |
1xxxxxxxx8 | 3xxxxxxxx3 | 0 | 115460 | 32419 |
1xxxxxxxx6 | 3xxxxxxxx3 | 1 | 48853 | 32419 |
Wenn nicht nummeriert, legen Sie eine CSV-Datei mit den folgenden Spalten fest.
user | item | rating |
---|---|---|
1xxxxxxxx2 | 3xxxxxxxx5 | 1 |
1xxxxxxxx9 | 3xxxxxxxx5 | 1 |
1xxxxxxxx8 | 3xxxxxxxx3 | 0 |
Laden Sie das gespeicherte Modell und prognostizieren Sie die Punktzahl aus der Kombination von Benutzer-ID und Produkt-ID. Bereiten Sie Listendaten für jede Benutzer-ID und Produkt-ID getrennt von den Trainingsdaten vor. Für das Vorhersageergebnis werden "alle Ergebnisse" und "Ergebnisse der besten N Bewertungen für jeden Benutzer" als CSV-Datei gespeichert. Diesmal werden die Benutzer-ID und die Produkt-ID, die in den ursprünglichen Trainingsdaten nicht vorhanden sind (= Aktionsprotokoll existiert nicht), nicht vorhergesagt.
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
"""predict score from collaborative filtering model."""
from datetime import datetime
from pyspark.ml.recommendation import ALSModel
from pyspark.sql import Row
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql.types import FloatType
from pyspark.sql.types import IntegerType
from pyspark.sql.types import StringType
from pyspark.sql.types import StructField
from pyspark.sql.types import StructType
class CreatePredictedScore(object):
"""predict score from collaborative filtering model."""
def run(
self,
model_file_path: str,
predict_data_dir_path: str,
user_data_file_path: str,
item_data_file_path: str,
processed_training_data_file_path: str,
data_limit: int=-1
) -> bool:
"""execute."""
# make spark context
spark = SparkSession\
.builder\
.appName('create_predicted_score')\
.config('spark.sql.crossJoin.enabled', 'true')\
.config('spark.debug.maxToStringFields', 500)\
.getOrCreate()
sqlContext = SQLContext(sparkContext=spark.sparkContext, sparkSession=spark)
# load user data
users_df = sqlContext\
.read\
.format('csv')\
.options(header='false')\
.load(user_data_file_path)
users_id_rdd = users_df.rdd.map(lambda l: Row(user_id=l[0]))
users_id_df = sqlContext.createDataFrame(users_id_rdd)
# load item data
items_df = sqlContext\
.read\
.format('csv')\
.options(header='false')\
.load(item_data_file_path)
items_id_rdd = items_df.rdd.map(lambda l: Row(item_id=l[0]))
items_id_df = sqlContext.createDataFrame(items_id_rdd)
# cross join user_id and item_id
joined_df = users_id_df.join(items_id_df)
joined_df.cache()
# delete unnecessary variables
del(users_df)
del(users_id_rdd)
del(users_id_df)
del(items_df)
del(items_id_rdd)
del(items_id_df)
# load training data
custom_schema = StructType([
StructField('user', StringType(), True),
StructField('item', StringType(), True),
StructField('rating', FloatType(), True),
StructField('unique_user_id', IntegerType(), True),
StructField('unique_item_id', IntegerType(), True),
])
training_df = sqlContext\
.read\
.format('csv')\
.options(header='true')\
.load(processed_training_data_file_path, schema=custom_schema)
# users
unique_users_rdd = training_df.rdd.map(lambda l: [l[0], l[3]])
unique_users_df = sqlContext.createDataFrame(
unique_users_rdd,
('user', 'unique_user_id')
).dropDuplicates()
unique_users_df.cache()
# items
unique_items_rdd = training_df.rdd.map(lambda l: [l[1], l[4]])
unique_items_df = sqlContext.createDataFrame(
unique_items_rdd,
('item', 'unique_item_id')
).dropDuplicates()
unique_items_df.cache()
# delete unnecessary variables
del(training_df)
del(unique_users_rdd)
del(unique_items_rdd)
# add unique user id
joined_df = joined_df.join(
unique_users_df,
joined_df['user_id'] == unique_users_df['user'],
'inner'
).drop(unique_users_df['user'])
# add unique item id
joined_df = joined_df.join(
unique_items_df,
joined_df['item_id'] == unique_items_df['item'],
'inner'
).drop(unique_items_df['item'])
# load model
model = ALSModel.load(model_file_path)
# predict score
predictions = model.transform(joined_df)
all_predict_data = predictions\
.select('user_id', 'item_id', 'prediction')\
.filter('prediction > 0')
# save
# all score
saved_data_file_path = predict_data_dir_path + 'als_predict_data_all.csv'
all_predict_data.write\
.format('csv')\
.mode('overwrite')\
.options(header='true')\
.save(saved_data_file_path)
# limited score
data_limit = int(data_limit)
if data_limit > 0:
all_predict_data.registerTempTable('predictions')
sql = 'SELECT user_id, item_id, prediction ' \
+ 'FROM ( ' \
+ ' SELECT user_id, item_id, prediction, dense_rank() ' \
+ ' OVER (PARTITION BY user_id ORDER BY prediction DESC) AS rank ' \
+ ' FROM predictions ' \
+ ') tmp WHERE rank <= %d' % (data_limit)
limited_predict_data = sqlContext.sql(sql)
saved_data_file_path = predict_data_dir_path + 'als_predict_data_limit.csv'
limited_predict_data.write\
.format('csv')\
.mode('overwrite')\
.options(header='true')\
.save(saved_data_file_path)
return True
Die Kombination von Vorhersagezielen wird durch das folgende Verfahren erstellt.
Wenn Sie im ersten Abschnitt nicht neu nummerieren mussten, ändern Sie die Methode "run" wie folgt:
def run(
self,
model_file_path: str,
predict_data_dir_path: str,
processed_training_data_file_path: str,
data_limit: int=-1
) -> bool:
"""execute."""
# make spark context
spark = SparkSession\
.builder\
.appName('create_predicted_score')\
.config('spark.sql.crossJoin.enabled', 'true')\
.config('spark.debug.maxToStringFields', 500)\
.getOrCreate()
sqlContext = SQLContext(sparkContext=spark.sparkContext, sparkSession=spark)
# load training data
custom_schema = StructType([
StructField('user', IntegerType(), True),
StructField('item', IntegerType(), True),
StructField('rating', FloatType(), True),
])
training_df = sqlContext\
.read\
.format('csv')\
.options(header='true')\
.load(processed_training_data_file_path, schema=custom_schema)
# load user data
users_id_rdd = training_df.rdd.map(lambda l: Row(user_id=l[0]))
users_id_df = sqlContext.createDataFrame(users_id_rdd)
# load item data
items_id_rdd = training_df.rdd.map(lambda l: Row(item_id=l[1]))
items_id_df = sqlContext.createDataFrame(items_id_rdd)
# cross join user_id and item_id
joined_df = users_id_df.join(items_id_df)
joined_df.cache()
# delete unnecessary variables
del(training_df)
del(users_id_rdd)
del(users_id_df)
del(items_id_rdd)
del(items_id_df)
# load model
model = ALSModel.load(model_file_path)
(wie unten)
Führen Sie wie folgt aus.
cps = CreatePredictedScore()
cps.run(
model_file_path,
predict_data_dir_path,
user_data_file_path,
item_data_file_path,
processed_training_data_file_path,
data_limit
)
Die Parameter sind wie folgt. In data_limit
wird N der besten N Scores angegeben. Wenn 0 oder weniger angegeben wird, werden die obersten N Daten nicht erstellt.
Legen Sie für "user_data_file_path" die CSV-Datei mit der Benutzer-ID in der ersten Spalte fest. Ich verwende eine Datei ohne Header. Legen Sie für "item_data_file_path" die CSV-Datei mit der Produkt-ID in der ersten Spalte fest. Ich verwende auch eine Datei ohne Header. Legen Sie eine CSV-Datei mit den folgenden Spalten in "Processed_training_data_file_path" fest.
user | item | rating | unique_user_id | unique_item_id |
---|---|---|---|---|
1xxxxxxxx7 | 3xxxxxxxx3 | 1 | 57704 | 32419 |
1xxxxxxxx8 | 3xxxxxxxx3 | 0 | 115460 | 32419 |
1xxxxxxxx6 | 3xxxxxxxx3 | 1 | 48853 | 32419 |
Eine CSV-Datei mit den folgenden Spalten wird in "Predict_Data_Dir_path" ausgegeben.
user_id | user_id | prediction |
---|---|---|
1xxxxxxxx3 | 3xxxxxxxx4 | 0.15594198 |
1xxxxxxxx3 | 3xxxxxxxx0 | 0.19135818 |
1xxxxxxxx3 | 3xxxxxxxx8 | 0.048197098 |
"Vorhersage" ist der vorhergesagte Wert.
Co-Filterung mit ALS in PySpark implementiert. Es tut mir leid, dass es schwer zu lesen ist, da ich die Methoden nicht aufgeteilt habe. Nächstes Mal werde ich erklären, wie das Modell bewertet wird.
Recommended Posts