[PYTHON] Recommandation de film utilisant le co-filtrage de Spark MLlib

Comme les chances de toucher Spark augmentent, j'ai essayé de mettre en place un système de recommandation utilisant MLlib comme inventaire des connaissances. Spark's Movie Recommendation, qui est utilisé dans divers didacticiels MLlib tels que SparkSamit2014, mais edX Introduction to Big Data avec Apache Spar semblait être bon en termes de contenu, donc je l'ai implémenté en l'utilisant comme sujet. Ce cours est implémenté dans Spark 1.3.1, mais il est un peu trop ancien, donc les fonctions pouvant être utilisées dans 1.6.1 sont modifiées en les utilisant.

Procédure approximative

① Préparation des données Divisez les données d'origine en données d'entraînement, d'évaluation et de test (2) Affichez le film avec la note moyenne la plus élevée parmi les films avec une note de 500 ou plus ③ Mise en place du filtrage coopératif ④ Ajoutez-vous aux données d'entraînement en tant qu'ID utilisateur "0" et évaluez votre film préféré ⑤ Demandez à l'algorithme de recommander un film basé sur votre propre évaluation

code

Tout d'abord, lisez les données, divisez-les et regardez-les. Les données à utiliser incluent le jeu de données de film constitué de UserID :: MovieID :: Rating :: Timestamp et le jeu de données de classement constitué de MovieID :: Title :: Genres.

numPartitions = 2

ratingFileName = "ratings.txt"
rawRatings = sc.textFile(ratingFileName, numPartitions)

moviesFileName = "movies.txt"
rawMovies = sc.textFile(moviesFileName, numPartitions)

def get_ratings_tuple(entry):
    items = entry.split('::')
    return int(items[0]), int(items[1]), float(items[2])

def get_movie_tuple(entry):
    items = entry.split('::')
    return int(items[0]), items[1]

ratingsRDD = rawRatings.map(get_ratings_tuple).cache()
moviesRDD = rawMovies.map(get_movie_tuple).cache()

ratingsCount = ratingsRDD.count()
moviesCount = moviesRDD.count()

print 'There are %s ratings and %s movies in the datasets' % (ratingsCount, moviesCount)
print 'Ratings: %s' % ratingsRDD.take(3)
print 'Movies: %s' % moviesRDD.take(3)

There are 1000209 ratings and 3883 movies in the datasets
Ratings: [(1, 1193, 5.0), (1, 661, 3.0), (1, 914, 3.0)]
Movies: [(1, u'Toy Story (1995)'), (2, u'Jumanji (1995)'), (3, u'Grumpier Old Men (1995)')]

Nous avons plus de 500 critiques pour découvrir les types de films les plus populaires et sélectionner 20 des plus populaires. Créez un RDD qui inclut le MovieID, le nombre de notes et la note moyenne, et joignez-le au movieRDD pour générer un taple avec (note moyenne, titre du film, note). Après cela, seuls les titres avec 500 évaluations ou plus sont sélectionnés et triés en fonction du score d'évaluation moyen. Je crée une fonction appelée sortFunction () pour trier les points de classement et les titres de films par ordre alphabétique.

def getCountsAndAverages(IDandRatingsTuple):
    aggr_result = (IDandRatingsTuple[0], (len(IDandRatingsTuple[1]), float(sum(IDandRatingsTuple[1])) / len(IDandRatingsTuple[1])))
    return aggr_result


movieNameWithAvgRatingsRDD = (ratingsRDD
                          .map(lambda x:(x[1], x[2]))
                          .groupByKey()
                          .map(getCountsAndAverages)
                          .join(moviesRDD)
                          .map(lambda x:(x[1][0][1], x[1][1], x[0])))


print 'movieNameWithAvgRatingsRDD: %s\n' % movieNameWithAvgRatingsRDD.take(3)



def sortFunction(tuple):
    key = unicode('%.3f' % tuple[0])
    value = tuple[1]
    return (key + ' ' + value)

movieLimitedAndSortedByRatingRDD = (movieNameWithAvgRatingsRDD
                                    .filter(lambda x: (x[2] > 500))
                                    .sortBy(sortFunction, False))

print 'Movies with highest ratings:'
print '(average rating, movie name, number of reviews)'
for ratingsTuple in movieLimitedAndSortedByRatingRDD.take(10):
    print ratingsTuple


movieNameWithAvgRatingsRDD: [(3.49618320610687, u'Great Mouse Detective, The (1986)', 2048), (3.7871690427698574, u'Moonstruck (1987)', 3072), (2.7294117647058824, u'Waiting to Exhale (1995)', 4)]


Movies with highest ratings:
(average rating, movie name, number of reviews)
(5.0, u'Ulysses (Ulisse) (1954)', 3172)
(5.0, u'Song of Freedom (1936)', 3382)
(5.0, u'Smashing Time (1967)', 3233)
(5.0, u'Schlafes Bruder (Brother of Sleep) (1995)', 989)
(5.0, u'One Little Indian (1973)', 3607)
(5.0, u'Lured (1947)', 3656)
(5.0, u'Gate of Heavenly Peace, The (1995)', 787)
(5.0, u'Follow the Bitch (1998)', 1830)
(5.0, u'Bittersweet Motel (2000)', 3881)
(5.0, u'Baby, The (1973)', 3280)

Ensuite, faites des recommandations en utilisant le co-filtrage. MLlib a une bibliothèque qui utilise ALS, donc je vais l'utiliser tel quel.

Pour le moment, divisez l'ensemble de données en données d'entraînement, d'évaluation et de test pour la création de modèles.

trainingRDD, validationRDD, testRDD = ratingsRDD.randomSplit([6, 2, 2], seed=0)

print 'Training: %s, validation: %s, test: %s\n' % (trainingRDD.count(),
                                                    validationRDD.count(),
                                                    testRDD.count())
print trainingRDD.take(3)
print validationRDD.take(3)
print testRDD.take(3)

validationForPredictRDD = validationRDD.map(lambda x: (x[0], x[1]))
print validationForPredictRDD.take(3)

actualReformattedRDD = validationRDD.map(lambda x: ((x[0], x[1]), x[2]))
print actualReformattedRDD.take(3)


Training: 600364, validation: 199815, test: 200030

[(1, 661, 3.0), (1, 914, 3.0), (1, 1197, 3.0)]
[(1, 3408, 4.0), (1, 2355, 5.0), (1, 938, 4.0)]
[(1, 1193, 5.0), (1, 1287, 5.0), (1, 2804, 5.0)]
[(1, 3408), (1, 2355), (1, 938)]
[((1, 3408), 4.0), ((1, 2355), 5.0), ((1, 938), 4.0)]

C'est la construction de modèles, mais la recherche de grille recherche de meilleurs paramètres. Aussi, pour l'évaluation du modèle, [Racine carrée de l'erreur quadratique moyenne (RMSE) parmi les indicateurs fournis dans MLlib](https://spark.apache.org/docs/latest/api/python/pyspark.mllib.html Utilisez # pyspark.mllib.evaluation.RegressionMetrics).

from pyspark.mllib.recommendation import ALS
from pyspark.mllib.evaluation import RegressionMetrics

seed = 5L
iterations = [5,7,10]
regularizationParameter = 0.1
ranks = [4, 8, 12]
RMSEs = [0, 0, 0, 0, 0, 0, 0, 0, 0]
err = 0
tolerance = 0.03

minRMSE = float('inf')
bestRank = -1
bestIteration = -1
for rank in ranks:
    for iteration in iterations:
        model = ALS.train(trainingRDD,
                          rank,
                          seed=seed,
                          iteration,
                          lambda_=regularizationParameter)
        predictedRatingsRDD = model.predictAll(validationForPredictRDD)
        predictedReformattedRDD = predictedRatingsRDD.map(lambda x: ((x[0], x[1]), x[2]))
    
        predictionAndObservations = (predictedReformattedRDD
                                     .join(actualReformattedRDD)
                                     .map(lambda x: x[1]))
    
        metrics = RegressionMetrics(predictionAndObservations)
        RMSE = metrics.rootMeanSquaredError
        RMSEs[err] = RMSE
        err += 1
        
        print 'For rank %s and itereation %s, the RMSE is %s' % (rank, iteration, RMSE)
        if RMSE < minRMSE:
            minRMSE = RMSE
            bestIteretioin = iteretaion
            bestRank = rank

print 'The best model was trained with rank %s and iteratin %s'  % (bestRank, bestIteretion)
            

For rank 4 and itereation 5, the RMSE is 0.903719946201
For rank 4 and itereation 7, the RMSE is 0.893408395534
For rank 4 and itereation 10, the RMSE is 0.886260195446
For rank 8 and itereation 5, the RMSE is 0.89365207233
For rank 8 and itereation 7, the RMSE is 0.883901283207
For rank 8 and itereation 10, the RMSE is 0.876701840863
For rank 12 and itereation 5, the RMSE is 0.887127524585
For rank 12 and itereation 7, the RMSE is 0.87863327159
For rank 12 and itereation 10, the RMSE is 0.872532683651
The best model was trained with rank 12 and iteratin 10

Vérifiez avec les données de test. Comme il n'y a pas de problème avec RMSE, il semble que le surapprentissage ne se produira pas.

bestModel = ALS.train(trainingRDD,
                      bestRank,
                      seed=seed,
                      iterations=bestIteretion,
                      lambda_=regularizationParameter)

testForPredictingRDD = testRDD.map(lambda x: (x[0], x[1]))
testReformattedRDD = testRDD.map(lambda x: ((x[0], x[1]), x[2]))

predictedTestRDD = bestModel.predictAll(testForPredictingRDD)
predictedTestReformattedRDD = predictedTestRDD.map(lambda x: ((x[0], x[1]), x[2]))

predictionAndObservationsTest = (predictedTestReformattedRDD
                             .join(testReformattedRDD)
                             .map(lambda x: x[1]))

metrics = RegressionMetrics(predictionAndObservationsTest)
testRMSE = metrics.rootMeanSquaredError

print 'The model had a RMSE on the test set of %s' % testRMSE


The model had a RMSE on the test set of 0.87447554868

Enfin, évaluez votre film préféré et ajoutez-le aux données en tant qu'ID utilisateur "0". Demandez-leur de recommander un film en fonction de cette note. Prenez votre film préféré de movieRDD, évaluez-le, ajoutez-le et utilisez-le pour entraîner votre modèle. Ensuite, obtenez la prédiction de l'ID utilisateur "0" et affichez en premier celui avec le score d'évaluation le plus élevé.

myUserID = 0
myRatedMovies = [(myUserID, 1, 5), #Toy Story
                 (myUserID, 648, 3), # Mission Impossible
                 (myUserID, 1580, 4), # Men In Black
                 (myUserID, 1097, 3), # ET
                 (myUserID, 3247, 5)] #Sister Act

myRatingsRDD = sc.parallelize(myRatedMovies)
trainingWithMyRatingsRDD = trainingRDD.union(myRatingsRDD)

myRatingsModel = ALS.train(trainingWithMyRatingsRDD,
                           bestRank, 
                           seed=seed,
                           iterations=bestIteretion,
                           lambda_=regularizationParameter)


myUnratedMoviesRDD = (moviesRDD
                      .filter(lambda x: x[0] not in [x[1] for x in myRatedMovies])
                      .map(lambda x: (myUserID, x[0])))

predictedRatingsRDD = myRatingsModel.predictAll(myUnratedMoviesRDD)
predictedRDD = predictedRatingsRDD.map(lambda x: (x[1], x[2]))

movieCountsRDD = (ratingsRDD
                  .map(lambda x:(x[1], x[2]))
                  .groupByKey()
                  .map(getCountsAndAverages)
                  .map(lambda x: (x[0], x[1][0])))


#Marge PredictedRDD and CountsRDD
predictedWithCountsRDD  = (predictedRDD
                           .join(movieCountsRDD))


ratingsWithNamesRDD = (predictedWithCountsRDD
                       .filter(lambda x: x[1][1] > 75)
                       .join(moviesRDD)
                       .map(lambda x: (x[1][0][0], x[1][1], x[1][0][1])))

predictedHighestRatedMovies = ratingsWithNamesRDD.takeOrdered(10, key=lambda x: -x[0])
print ('My highest rated movies as predicted (for movies with more than 75 reviews):\n%s' %
        '\n'.join(map(str, predictedHighestRatedMovies)))

My highest rated movies as predicted (for movies with more than 75 reviews):
(4.74482593848827, u'Sound of Music, The (1965)', 882)
(4.580669496447569, u'Mary Poppins (1964)', 1011)
(4.486424714752521, u'Beauty and the Beast (1991)', 1060)
(4.478042748281928, u'Mulan (1998)', 490)
(4.477453571213953, u'Toy Story 2 (1999)', 1585)
(4.439390718632932, u'Fantasia 2000 (1999)', 453)
(4.405894101045507, u'FairyTale: A True Story (1997)', 87)
(4.4030583744108425, u"Singin' in the Rain (1952)", 751)
(4.390333274084924, u'Top Hat (1935)', 251)
(4.347757079374581, u'Gone with the Wind (1939)', 1156)

J'évalue des films solides, donc des films solides sont recommandés lol

Recommended Posts

Recommandation de film utilisant le co-filtrage de Spark MLlib
[Recommandation] Filtrage basé sur le contenu et filtrage coopératif
[Recommandation] Résumé des avantages et des inconvénients de la méthode de filtrage / mise en œuvre basée sur le contenu et coopérative
Recommandation de poésie
Pensez aux spécifications pour le développement de moteurs de recommandation de co-filtrage
J'ai essayé de mettre en œuvre le co-filtrage (recommandation) avec redis et python