[PYTHON] Co-Filterung mit PySpark

Hallo zusammen. @best_not_best. Dieses Mal werde ich die Technologie vorstellen, die zu meiner Arbeit passt.

Überblick

Die Co-Filterung wird verwendet, um die Punktzahl für den Kauf eines Produkts durch einen Benutzer zu berechnen. Da der Rechenaufwand groß ist und die Verarbeitung umfangreicher Daten einige Zeit in Anspruch nimmt, führt PySpark eine verteilte Verarbeitung durch.

Umgebung

Fügen Sie spark-env.sh Folgendes hinzu, damit Sie Python 3.x auf der Spark-Seite aufrufen können.

export PYSPARK_PYTHON=python3
PYSPARK_DRIVER_PYTHON=python3

Verfahren

  1. Holen Sie sich Trainingsdaten
  2. Verarbeiten Sie die Trainingsdaten (falls erforderlich)
  3. Erstellen Sie ein Modell
  4. Prognostizieren Sie die Punktzahl

Trainingsdaten abrufen

Erfassen Sie Trainingsdaten aus einer Datenbank usw. und erstellen Sie eine CSV-Datei mit 3 Spalten "Benutzer", "Element" und "Bewertung". Wenn Sie beispielsweise "Produktkaufdaten für den letzten Monat" verwenden, lauten die Daten "Benutzer-ID", "Produkt-ID" und "ob das Produkt gekauft wurde oder nicht (nicht gekauft: 0 / gekauft: 1)". Unten finden Sie ein Beispiel für eine CSV-Datei.

user item rating
1xxxxxxxx2 3xxxxxxxx5 1
1xxxxxxxx9 3xxxxxxxx5 1
1xxxxxxxx8 3xxxxxxxx3 0

Als Name für "Bewertung" ist "wie viel der Benutzer dem Produkt eine Bewertung gegeben hat" die ursprüngliche Verwendung, aber wie oben erwähnt "ob das Produkt gekauft wurde" oder "auf die Seite zugegriffen". Es ist auch möglich, mit Daten wie "ob oder nicht" zu implementieren. Im ersteren Fall handelt es sich um ein Modell, das vorhersagt, "wie viel der Benutzer das Produkt kauft" und im letzteren Fall, "wie oft der Benutzer die Seite besucht".

Trainingsdaten verarbeiten

Da die Benutzer-ID und die Produkt-ID nur bis zum Maximalwert von "int32" (2.147.483.647) verarbeitet werden können, wird die ID erneut nummeriert, wenn eine ID vorliegt, die diese überschreitet. Da es nur ganzzahlige Werte verarbeiten kann, wird es auf die gleiche Weise neu nummeriert, wenn es eine Zeichenfolge enthält. Wenn die ID ein ganzzahliger Wert ist und den Maximalwert von "int32" nicht überschreitet, überspringen Sie diesen Schritt.

#!/usr/bin/env python
# -*- coding: UTF-8 -*-

"""processing training data."""

from datetime import datetime
from pyspark.sql.dataframe import DataFrame
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext

class ProcessTrainingData(object):
    """get training data from Redshift, and add sequence number to data."""

    def __get_action_log(
        self,
        sqlContext: SQLContext,
        unprocessed_data_file_path: str
    ) -> DataFrame:
        """get data."""
        df = sqlContext\
            .read\
            .format('csv')\
            .options(header='true')\
            .load(unprocessed_data_file_path)

        return df

    def run(
        self,
        unprocessed_data_file_path: str,
        training_data_dir_path: str
    ) -> bool:
        """execute."""
        # make spark context
        spark = SparkSession\
            .builder\
            .appName('process_training_data')\
            .config('spark.sql.crossJoin.enabled', 'true')\
            .config('spark.debug.maxToStringFields', 500)\
            .getOrCreate()
        sqlContext = SQLContext(sparkContext=spark.sparkContext, sparkSession=spark)

        # get data
        df = self.__get_action_log(sqlContext, unprocessed_data_file_path)

        # make sequence number of users
        unique_users_rdd = df.rdd.map(lambda l: l[0]).distinct().zipWithIndex()
        unique_users_df = sqlContext.createDataFrame(
            unique_users_rdd,
            ('user', 'unique_user_id')
        )

        # make sequence number of items
        unique_items_rdd = df.rdd.map(lambda l: l[1]).distinct().zipWithIndex()
        unique_items_df = sqlContext.createDataFrame(
            unique_items_rdd,
            ('item', 'unique_item_id')
        )

        # add sequence number of users, sequence number of items to data
        df = df.join(
            unique_users_df,
            df['user'] == unique_users_df['user'],
            'inner'
        ).drop(unique_users_df['user'])
        df = df.join(
            unique_items_df,
            df['item'] == unique_items_df['item'],
            'inner'
        ).drop(unique_items_df['item'])

        # save
        saved_data_file_path = training_data_dir_path + 'cf_training_data.csv'
        df.write\
            .format('csv')\
            .mode('overwrite')\
            .options(header='true')\
            .save(saved_data_file_path)

        return True

Lesen Sie die CSV der Trainingsdaten, fügen Sie die mit "zipWithIndex ()" neu nummerierten Spalten "Benutzer-ID" und "Produkt-ID" hinzu und speichern Sie sie als separate Datei. Führen Sie wie folgt aus.

ptd = ProcessTrainingData()
ptd.run(unprocessed_data_file_path, training_data_dir_path)

Die Parameter sind wie folgt.

Wie oben erwähnt, setzen Sie "unprocessed_data_file_path" auf eine CSV-Datei mit den folgenden Spalten.

user item rating
1xxxxxxxx2 3xxxxxxxx5 1
1xxxxxxxx9 3xxxxxxxx5 1
1xxxxxxxx8 3xxxxxxxx3 0

Bei der Ausführung wird eine CSV-Datei mit den folgenden Spalten an training_data_dir_path ausgegeben.

user item rating unique_user_id unique_item_id
1xxxxxxxx7 3xxxxxxxx3 1 57704 32419
1xxxxxxxx8 3xxxxxxxx3 0 115460 32419
1xxxxxxxx6 3xxxxxxxx3 1 48853 32419

Erstellen Sie ein Modell

Erstellen und speichern Sie ein Modell für die Co-Filterung.

#!/usr/bin/env python
# -*- coding: UTF-8 -*-

"""create collaborative filtering model."""

from datetime import datetime
from pyspark.ml.recommendation import ALS
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql.types import FloatType
from pyspark.sql.types import IntegerType
from pyspark.sql.types import StringType
from pyspark.sql.types import StructField
from pyspark.sql.types import StructType

class CreateCfModel(object):
    """create collaborative filtering model."""

    def run(
        self,
        processed_training_data_file_path: str,
        model_dir_path: str,
        rank: int,
        max_iter: int,
        implicit_prefs: str,
        alpha: float,
        num_user_blocks: int,
        num_item_blocks: int
    ) -> bool:
        """execute."""
        # make spark context
        spark = SparkSession\
            .builder\
            .appName('create_cf_model')\
            .config('spark.sql.crossJoin.enabled', 'true')\
            .config('spark.debug.maxToStringFields', 500)\
            .getOrCreate()
        sqlContext = SQLContext(sparkContext=spark.sparkContext, sparkSession=spark)

        # create model
        als = ALS(
            rank=int(rank),
            maxIter=int(max_iter),
            implicitPrefs=bool(implicit_prefs),
            alpha=float(alpha),
            numUserBlocks=int(num_user_blocks),
            numItemBlocks=int(num_item_blocks),
            userCol='unique_user_id',
            itemCol='unique_item_id'
        )

        # load training data
        custom_schema = StructType([
            StructField('user', StringType(), True),
            StructField('item', StringType(), True),
            StructField('rating', FloatType(), True),
            StructField('unique_user_id', IntegerType(), True),
            StructField('unique_item_id', IntegerType(), True),
        ])
        df = sqlContext\
            .read\
            .format('csv')\
            .options(header='true')\
            .load(processed_training_data_file_path, schema=custom_schema)

        # fitting
        model = als.fit(df)

        # save
        saved_data_dir_path = model_dir_path + 'als_model'
        model.write().overwrite().save(saved_data_dir_path)

        return True

Wenn Sie im vorherigen Abschnitt nicht neu nummerieren mussten, ordnen Sie die Spaltennamen in der CSV-Datei den Spaltennamen "Modell erstellen" und "Trainingsdaten laden" zu und ändern Sie sie wie folgt.

# create model
als = ALS(
    rank=int(rank),
    maxIter=int(max_iter),
    implicitPrefs=bool(implicit_prefs),
    alpha=float(alpha),
    numUserBlocks=int(num_user_blocks),
    numItemBlocks=int(num_item_blocks),
    userCol='user',
    itemCol='item'
)

# load training data
custom_schema = StructType([
    StructField('user', IntegerType(), True),
    StructField('item', IntegerType(), True),
    StructField('rating', FloatType(), True),
])

Führen Sie wie folgt aus.

ccm = CreateCfModel()
ccm.run(
    processed_training_data_file_path,
    model_dir_path,
    rank,
    max_iter,
    implicit_prefs,
    alpha,
    num_user_blocks,
    num_item_blocks
)

Die Parameter sind wie folgt. rank, max_iter, implicit_prefs, alpha, num_user_blocks, num_item_blocks sind [PySpark ALS-Parameter](http://spark.apache.org/docs/latest/api/python/ Es wird pyspark.ml.html # pyspark.ml.recommendation.ALS sein.

Wenn im vorherigen Abschnitt nummeriert, legen Sie die CSV-Datei mit den folgenden Spalten in "verarbeiteter_training_data_file_path" fest.

user item rating unique_user_id unique_item_id
1xxxxxxxx7 3xxxxxxxx3 1 57704 32419
1xxxxxxxx8 3xxxxxxxx3 0 115460 32419
1xxxxxxxx6 3xxxxxxxx3 1 48853 32419

Wenn nicht nummeriert, legen Sie eine CSV-Datei mit den folgenden Spalten fest.

user item rating
1xxxxxxxx2 3xxxxxxxx5 1
1xxxxxxxx9 3xxxxxxxx5 1
1xxxxxxxx8 3xxxxxxxx3 0

Punktzahl vorhersagen

Laden Sie das gespeicherte Modell und prognostizieren Sie die Punktzahl aus der Kombination von Benutzer-ID und Produkt-ID. Bereiten Sie Listendaten für jede Benutzer-ID und Produkt-ID getrennt von den Trainingsdaten vor. Für das Vorhersageergebnis werden "alle Ergebnisse" und "Ergebnisse der besten N Bewertungen für jeden Benutzer" als CSV-Datei gespeichert. Diesmal werden die Benutzer-ID und die Produkt-ID, die in den ursprünglichen Trainingsdaten nicht vorhanden sind (= Aktionsprotokoll existiert nicht), nicht vorhergesagt.

#!/usr/bin/env python
# -*- coding: UTF-8 -*-

"""predict score from collaborative filtering model."""

from datetime import datetime
from pyspark.ml.recommendation import ALSModel
from pyspark.sql import Row
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql.types import FloatType
from pyspark.sql.types import IntegerType
from pyspark.sql.types import StringType
from pyspark.sql.types import StructField
from pyspark.sql.types import StructType

class CreatePredictedScore(object):
    """predict score from collaborative filtering model."""

    def run(
        self,
        model_file_path: str,
        predict_data_dir_path: str,
        user_data_file_path: str,
        item_data_file_path: str,
        processed_training_data_file_path: str,
        data_limit: int=-1
    ) -> bool:
        """execute."""
        # make spark context
        spark = SparkSession\
            .builder\
            .appName('create_predicted_score')\
            .config('spark.sql.crossJoin.enabled', 'true')\
            .config('spark.debug.maxToStringFields', 500)\
            .getOrCreate()
        sqlContext = SQLContext(sparkContext=spark.sparkContext, sparkSession=spark)

        # load user data
        users_df = sqlContext\
            .read\
            .format('csv')\
            .options(header='false')\
            .load(user_data_file_path)
        users_id_rdd = users_df.rdd.map(lambda l: Row(user_id=l[0]))
        users_id_df = sqlContext.createDataFrame(users_id_rdd)

        # load item data
        items_df = sqlContext\
            .read\
            .format('csv')\
            .options(header='false')\
            .load(item_data_file_path)
        items_id_rdd = items_df.rdd.map(lambda l: Row(item_id=l[0]))
        items_id_df = sqlContext.createDataFrame(items_id_rdd)

        # cross join user_id and item_id
        joined_df = users_id_df.join(items_id_df)
        joined_df.cache()

        # delete unnecessary variables
        del(users_df)
        del(users_id_rdd)
        del(users_id_df)
        del(items_df)
        del(items_id_rdd)
        del(items_id_df)

        # load training data
        custom_schema = StructType([
            StructField('user', StringType(), True),
            StructField('item', StringType(), True),
            StructField('rating', FloatType(), True),
            StructField('unique_user_id', IntegerType(), True),
            StructField('unique_item_id', IntegerType(), True),
        ])
        training_df = sqlContext\
            .read\
            .format('csv')\
            .options(header='true')\
            .load(processed_training_data_file_path, schema=custom_schema)
        # users
        unique_users_rdd = training_df.rdd.map(lambda l: [l[0], l[3]])
        unique_users_df = sqlContext.createDataFrame(
            unique_users_rdd,
            ('user', 'unique_user_id')
        ).dropDuplicates()
        unique_users_df.cache()
        # items
        unique_items_rdd = training_df.rdd.map(lambda l: [l[1], l[4]])
        unique_items_df = sqlContext.createDataFrame(
            unique_items_rdd,
            ('item', 'unique_item_id')
        ).dropDuplicates()
        unique_items_df.cache()

        # delete unnecessary variables
        del(training_df)
        del(unique_users_rdd)
        del(unique_items_rdd)

        # add unique user id
        joined_df = joined_df.join(
            unique_users_df,
            joined_df['user_id'] == unique_users_df['user'],
            'inner'
        ).drop(unique_users_df['user'])

        # add unique item id
        joined_df = joined_df.join(
            unique_items_df,
            joined_df['item_id'] == unique_items_df['item'],
            'inner'
        ).drop(unique_items_df['item'])

        # load model
        model = ALSModel.load(model_file_path)

        # predict score
        predictions = model.transform(joined_df)
        all_predict_data = predictions\
            .select('user_id', 'item_id', 'prediction')\
            .filter('prediction > 0')

        # save
        # all score
        saved_data_file_path = predict_data_dir_path + 'als_predict_data_all.csv'
        all_predict_data.write\
            .format('csv')\
            .mode('overwrite')\
            .options(header='true')\
            .save(saved_data_file_path)

        # limited score
        data_limit = int(data_limit)
        if data_limit > 0:
            all_predict_data.registerTempTable('predictions')
            sql = 'SELECT user_id, item_id, prediction ' \
                + 'FROM ( ' \
                + '  SELECT user_id, item_id, prediction, dense_rank() ' \
                + '  OVER (PARTITION BY user_id ORDER BY prediction DESC) AS rank ' \
                + '  FROM predictions ' \
                + ') tmp WHERE rank <= %d' % (data_limit)
            limited_predict_data = sqlContext.sql(sql)

            saved_data_file_path = predict_data_dir_path + 'als_predict_data_limit.csv'
            limited_predict_data.write\
                .format('csv')\
                .mode('overwrite')\
                .options(header='true')\
                .save(saved_data_file_path)

        return True

Die Kombination von Vorhersagezielen wird durch das folgende Verfahren erstellt.

  1. Erstellen Sie aus den einzelnen Listendaten alle Kombinationen aus Benutzer-ID und Produkt-ID
  2. Lesen Sie die Trainingsdaten, fügen Sie die neu nummerierte ID zur Kombination von 1 hinzu und löschen Sie die Benutzer-ID und die Produkt-ID, die nicht nummeriert werden konnten.

Wenn Sie im ersten Abschnitt nicht neu nummerieren mussten, ändern Sie die Methode "run" wie folgt:

def run(
    self,
    model_file_path: str,
    predict_data_dir_path: str,
    processed_training_data_file_path: str,
    data_limit: int=-1
) -> bool:
    """execute."""
    # make spark context
    spark = SparkSession\
        .builder\
        .appName('create_predicted_score')\
        .config('spark.sql.crossJoin.enabled', 'true')\
        .config('spark.debug.maxToStringFields', 500)\
        .getOrCreate()
    sqlContext = SQLContext(sparkContext=spark.sparkContext, sparkSession=spark)

    # load training data
    custom_schema = StructType([
        StructField('user', IntegerType(), True),
        StructField('item', IntegerType(), True),
        StructField('rating', FloatType(), True),
    ])
    training_df = sqlContext\
        .read\
        .format('csv')\
        .options(header='true')\
        .load(processed_training_data_file_path, schema=custom_schema)

    # load user data
    users_id_rdd = training_df.rdd.map(lambda l: Row(user_id=l[0]))
    users_id_df = sqlContext.createDataFrame(users_id_rdd)

    # load item data
    items_id_rdd = training_df.rdd.map(lambda l: Row(item_id=l[1]))
    items_id_df = sqlContext.createDataFrame(items_id_rdd)

    # cross join user_id and item_id
    joined_df = users_id_df.join(items_id_df)
    joined_df.cache()

    # delete unnecessary variables
    del(training_df)
    del(users_id_rdd)
    del(users_id_df)
    del(items_id_rdd)
    del(items_id_df)

    # load model
    model = ALSModel.load(model_file_path)
(wie unten)

Führen Sie wie folgt aus.

cps = CreatePredictedScore()
cps.run(
    model_file_path,
    predict_data_dir_path,
    user_data_file_path,
    item_data_file_path,
    processed_training_data_file_path,
    data_limit
)

Die Parameter sind wie folgt. In data_limit wird N der besten N Scores angegeben. Wenn 0 oder weniger angegeben wird, werden die obersten N Daten nicht erstellt.

Legen Sie für "user_data_file_path" die CSV-Datei mit der Benutzer-ID in der ersten Spalte fest. Ich verwende eine Datei ohne Header. Legen Sie für "item_data_file_path" die CSV-Datei mit der Produkt-ID in der ersten Spalte fest. Ich verwende auch eine Datei ohne Header. Legen Sie eine CSV-Datei mit den folgenden Spalten in "Processed_training_data_file_path" fest.

user item rating unique_user_id unique_item_id
1xxxxxxxx7 3xxxxxxxx3 1 57704 32419
1xxxxxxxx8 3xxxxxxxx3 0 115460 32419
1xxxxxxxx6 3xxxxxxxx3 1 48853 32419

Eine CSV-Datei mit den folgenden Spalten wird in "Predict_Data_Dir_path" ausgegeben.

user_id user_id prediction
1xxxxxxxx3 3xxxxxxxx4 0.15594198
1xxxxxxxx3 3xxxxxxxx0 0.19135818
1xxxxxxxx3 3xxxxxxxx8 0.048197098

"Vorhersage" ist der vorhergesagte Wert.

Zusammenfassung

Co-Filterung mit ALS in PySpark implementiert. Es tut mir leid, dass es schwer zu lesen ist, da ich die Methoden nicht aufgeteilt habe. Nächstes Mal werde ich erklären, wie das Modell bewertet wird.

Referenzlink

Recommended Posts

Co-Filterung mit PySpark
Koordinierte Filterung mit Hauptkomponentenanalyse und K-Mittel-Clustering
Lernen Sie das kollaborative Filtern zusammen mit Coursera-Materialien für maschinelles Lernen
Ich habe versucht, Co-Filtering (Empfehlung) mit Redis und Python zu implementieren
Benutzerbasierte Co-Filterung mit Python
PySpark Leben beginnt mit Docker
[Empfehlung] Inhaltsbasierte Filterung und kooperative Filterung
[Python] Verwenden von OpenCV mit Python (Bildfilterung)
Messen Sie die Ähnlichkeit von Inhalten mit Pyspark
Textfilterung mit naiven Buchten von sklearn