[PYTHON] Filmempfehlung mit Co-Filterung von Spark MLlib

Da die Chancen, Spark zu berühren, zunehmen, habe ich versucht, ein Empfehlungssystem mit MLlib als Wissensinventar zu implementieren. Spark's Movie Recommendation, die in verschiedenen MLlib-Tutorials wie SparkSamit2014, aber edX Introduction zu Big Data mit Apache Spar schien inhaltlich gut zu sein, daher habe ich es implementiert, während ich es als Thema verwendet habe. Dieser Kurs ist in Spark 1.3.1 implementiert, aber etwas zu alt, sodass die Funktionen, die in 1.6.1 verwendet werden können, durch ihre Verwendung geändert werden.

Grobe Prozedur

① Datenaufbereitung Teilen Sie die Originaldaten in Trainings-, Evaluierungs- und Testdaten auf (2) Zeigen Sie den Film mit der höchsten Durchschnittsbewertung von Filmen mit einer Bewertung von 500 oder mehr an ③ Implementierung einer kooperativen Filterung ④ Fügen Sie sich als Benutzer-ID "0" zu den Trainingsdaten hinzu und bewerten Sie Ihren Lieblingsfilm ⑤ Lassen Sie den Algorithmus einen Film empfehlen, der auf Ihrer eigenen Bewertung basiert

Code

Lesen Sie zuerst die Daten, teilen Sie sie und sehen Sie sie sich an. Zu den zu verwendenden Daten gehören der Filmdatensatz bestehend aus UserID :: MovieID :: Rating :: Timestamp und der Bewertungsdatensatz bestehend aus MovieID :: Title :: Genres.

numPartitions = 2

ratingFileName = "ratings.txt"
rawRatings = sc.textFile(ratingFileName, numPartitions)

moviesFileName = "movies.txt"
rawMovies = sc.textFile(moviesFileName, numPartitions)

def get_ratings_tuple(entry):
    items = entry.split('::')
    return int(items[0]), int(items[1]), float(items[2])

def get_movie_tuple(entry):
    items = entry.split('::')
    return int(items[0]), items[1]

ratingsRDD = rawRatings.map(get_ratings_tuple).cache()
moviesRDD = rawMovies.map(get_movie_tuple).cache()

ratingsCount = ratingsRDD.count()
moviesCount = moviesRDD.count()

print 'There are %s ratings and %s movies in the datasets' % (ratingsCount, moviesCount)
print 'Ratings: %s' % ratingsRDD.take(3)
print 'Movies: %s' % moviesRDD.take(3)

There are 1000209 ratings and 3883 movies in the datasets
Ratings: [(1, 1193, 5.0), (1, 661, 3.0), (1, 914, 3.0)]
Movies: [(1, u'Toy Story (1995)'), (2, u'Jumanji (1995)'), (3, u'Grumpier Old Men (1995)')]

Wir haben über 500 Rezensionen, um herauszufinden, welche Art von Filmen beliebt sind, und wählen Sie 20 der beliebtesten aus. Erstellen Sie eine RDD, die die MovieID, die Anzahl der Bewertungen und die durchschnittliche Bewertung enthält, und verknüpfen Sie sie mit der movieRDD, um einen Taple mit (durchschnittliche Bewertung, Filmtitel, Bewertung) zu generieren. Danach werden nur die Titel mit 500 oder mehr Bewertungen ausgewählt und nach der durchschnittlichen Bewertungspunktzahl sortiert. Ich erstelle eine Funktion namens sortFunction (), um die Bewertungspunkte und Filmtitel in alphabetischer Reihenfolge zu sortieren.

def getCountsAndAverages(IDandRatingsTuple):
    aggr_result = (IDandRatingsTuple[0], (len(IDandRatingsTuple[1]), float(sum(IDandRatingsTuple[1])) / len(IDandRatingsTuple[1])))
    return aggr_result


movieNameWithAvgRatingsRDD = (ratingsRDD
                          .map(lambda x:(x[1], x[2]))
                          .groupByKey()
                          .map(getCountsAndAverages)
                          .join(moviesRDD)
                          .map(lambda x:(x[1][0][1], x[1][1], x[0])))


print 'movieNameWithAvgRatingsRDD: %s\n' % movieNameWithAvgRatingsRDD.take(3)



def sortFunction(tuple):
    key = unicode('%.3f' % tuple[0])
    value = tuple[1]
    return (key + ' ' + value)

movieLimitedAndSortedByRatingRDD = (movieNameWithAvgRatingsRDD
                                    .filter(lambda x: (x[2] > 500))
                                    .sortBy(sortFunction, False))

print 'Movies with highest ratings:'
print '(average rating, movie name, number of reviews)'
for ratingsTuple in movieLimitedAndSortedByRatingRDD.take(10):
    print ratingsTuple


movieNameWithAvgRatingsRDD: [(3.49618320610687, u'Great Mouse Detective, The (1986)', 2048), (3.7871690427698574, u'Moonstruck (1987)', 3072), (2.7294117647058824, u'Waiting to Exhale (1995)', 4)]


Movies with highest ratings:
(average rating, movie name, number of reviews)
(5.0, u'Ulysses (Ulisse) (1954)', 3172)
(5.0, u'Song of Freedom (1936)', 3382)
(5.0, u'Smashing Time (1967)', 3233)
(5.0, u'Schlafes Bruder (Brother of Sleep) (1995)', 989)
(5.0, u'One Little Indian (1973)', 3607)
(5.0, u'Lured (1947)', 3656)
(5.0, u'Gate of Heavenly Peace, The (1995)', 787)
(5.0, u'Follow the Bitch (1998)', 1830)
(5.0, u'Bittersweet Motel (2000)', 3881)
(5.0, u'Baby, The (1973)', 3280)

Geben Sie als Nächstes Empfehlungen mithilfe der Co-Filterung ab. MLlib hat eine Bibliothek, die [ALS] verwendet (http://spark.apache.org/docs/latest/mllib-collaborative-filtering.html), daher werde ich sie so verwenden, wie sie ist.

Teilen Sie den Datensatz vorerst in Trainings-, Evaluierungs- und Testdaten für die Modellbildung auf.

trainingRDD, validationRDD, testRDD = ratingsRDD.randomSplit([6, 2, 2], seed=0)

print 'Training: %s, validation: %s, test: %s\n' % (trainingRDD.count(),
                                                    validationRDD.count(),
                                                    testRDD.count())
print trainingRDD.take(3)
print validationRDD.take(3)
print testRDD.take(3)

validationForPredictRDD = validationRDD.map(lambda x: (x[0], x[1]))
print validationForPredictRDD.take(3)

actualReformattedRDD = validationRDD.map(lambda x: ((x[0], x[1]), x[2]))
print actualReformattedRDD.take(3)


Training: 600364, validation: 199815, test: 200030

[(1, 661, 3.0), (1, 914, 3.0), (1, 1197, 3.0)]
[(1, 3408, 4.0), (1, 2355, 5.0), (1, 938, 4.0)]
[(1, 1193, 5.0), (1, 1287, 5.0), (1, 2804, 5.0)]
[(1, 3408), (1, 2355), (1, 938)]
[((1, 3408), 4.0), ((1, 2355), 5.0), ((1, 938), 4.0)]

Es ist Modellbildung, aber die Rastersuche sucht nach besseren Parametern. Für die Modellbewertung auch Quadratwurzel des mittleren Quadratfehlers (RMSE) unter den in MLlib bereitgestellten Indikatoren Verwenden Sie # pyspark.mllib.evaluation.RegressionMetrics).

from pyspark.mllib.recommendation import ALS
from pyspark.mllib.evaluation import RegressionMetrics

seed = 5L
iterations = [5,7,10]
regularizationParameter = 0.1
ranks = [4, 8, 12]
RMSEs = [0, 0, 0, 0, 0, 0, 0, 0, 0]
err = 0
tolerance = 0.03

minRMSE = float('inf')
bestRank = -1
bestIteration = -1
for rank in ranks:
    for iteration in iterations:
        model = ALS.train(trainingRDD,
                          rank,
                          seed=seed,
                          iteration,
                          lambda_=regularizationParameter)
        predictedRatingsRDD = model.predictAll(validationForPredictRDD)
        predictedReformattedRDD = predictedRatingsRDD.map(lambda x: ((x[0], x[1]), x[2]))
    
        predictionAndObservations = (predictedReformattedRDD
                                     .join(actualReformattedRDD)
                                     .map(lambda x: x[1]))
    
        metrics = RegressionMetrics(predictionAndObservations)
        RMSE = metrics.rootMeanSquaredError
        RMSEs[err] = RMSE
        err += 1
        
        print 'For rank %s and itereation %s, the RMSE is %s' % (rank, iteration, RMSE)
        if RMSE < minRMSE:
            minRMSE = RMSE
            bestIteretioin = iteretaion
            bestRank = rank

print 'The best model was trained with rank %s and iteratin %s'  % (bestRank, bestIteretion)
            

For rank 4 and itereation 5, the RMSE is 0.903719946201
For rank 4 and itereation 7, the RMSE is 0.893408395534
For rank 4 and itereation 10, the RMSE is 0.886260195446
For rank 8 and itereation 5, the RMSE is 0.89365207233
For rank 8 and itereation 7, the RMSE is 0.883901283207
For rank 8 and itereation 10, the RMSE is 0.876701840863
For rank 12 and itereation 5, the RMSE is 0.887127524585
For rank 12 and itereation 7, the RMSE is 0.87863327159
For rank 12 and itereation 10, the RMSE is 0.872532683651
The best model was trained with rank 12 and iteratin 10

Überprüfen Sie mit Testdaten. Da es mit RMSE kein Problem gibt, scheint es nicht zu einem Überlernen zu kommen.

bestModel = ALS.train(trainingRDD,
                      bestRank,
                      seed=seed,
                      iterations=bestIteretion,
                      lambda_=regularizationParameter)

testForPredictingRDD = testRDD.map(lambda x: (x[0], x[1]))
testReformattedRDD = testRDD.map(lambda x: ((x[0], x[1]), x[2]))

predictedTestRDD = bestModel.predictAll(testForPredictingRDD)
predictedTestReformattedRDD = predictedTestRDD.map(lambda x: ((x[0], x[1]), x[2]))

predictionAndObservationsTest = (predictedTestReformattedRDD
                             .join(testReformattedRDD)
                             .map(lambda x: x[1]))

metrics = RegressionMetrics(predictionAndObservationsTest)
testRMSE = metrics.rootMeanSquaredError

print 'The model had a RMSE on the test set of %s' % testRMSE


The model had a RMSE on the test set of 0.87447554868

Bewerten Sie abschließend Ihren Lieblingsfilm und fügen Sie ihn als Benutzer-ID "0" zu den Daten hinzu. Bitten Sie sie, einen Film zu empfehlen, der auf dieser Bewertung basiert. Nehmen Sie Ihren Lieblingsfilm aus movieRDD, bewerten Sie ihn, fügen Sie ihn hinzu und trainieren Sie Ihr Modell damit. Holen Sie sich danach die Vorhersage für die Benutzer-ID "0" und zeigen Sie zuerst die mit der höchsten Bewertungspunktzahl an.

myUserID = 0
myRatedMovies = [(myUserID, 1, 5), #Toy Story
                 (myUserID, 648, 3), # Mission Impossible
                 (myUserID, 1580, 4), # Men In Black
                 (myUserID, 1097, 3), # ET
                 (myUserID, 3247, 5)] #Sister Act

myRatingsRDD = sc.parallelize(myRatedMovies)
trainingWithMyRatingsRDD = trainingRDD.union(myRatingsRDD)

myRatingsModel = ALS.train(trainingWithMyRatingsRDD,
                           bestRank, 
                           seed=seed,
                           iterations=bestIteretion,
                           lambda_=regularizationParameter)


myUnratedMoviesRDD = (moviesRDD
                      .filter(lambda x: x[0] not in [x[1] for x in myRatedMovies])
                      .map(lambda x: (myUserID, x[0])))

predictedRatingsRDD = myRatingsModel.predictAll(myUnratedMoviesRDD)
predictedRDD = predictedRatingsRDD.map(lambda x: (x[1], x[2]))

movieCountsRDD = (ratingsRDD
                  .map(lambda x:(x[1], x[2]))
                  .groupByKey()
                  .map(getCountsAndAverages)
                  .map(lambda x: (x[0], x[1][0])))


#Marge PredictedRDD and CountsRDD
predictedWithCountsRDD  = (predictedRDD
                           .join(movieCountsRDD))


ratingsWithNamesRDD = (predictedWithCountsRDD
                       .filter(lambda x: x[1][1] > 75)
                       .join(moviesRDD)
                       .map(lambda x: (x[1][0][0], x[1][1], x[1][0][1])))

predictedHighestRatedMovies = ratingsWithNamesRDD.takeOrdered(10, key=lambda x: -x[0])
print ('My highest rated movies as predicted (for movies with more than 75 reviews):\n%s' %
        '\n'.join(map(str, predictedHighestRatedMovies)))

My highest rated movies as predicted (for movies with more than 75 reviews):
(4.74482593848827, u'Sound of Music, The (1965)', 882)
(4.580669496447569, u'Mary Poppins (1964)', 1011)
(4.486424714752521, u'Beauty and the Beast (1991)', 1060)
(4.478042748281928, u'Mulan (1998)', 490)
(4.477453571213953, u'Toy Story 2 (1999)', 1585)
(4.439390718632932, u'Fantasia 2000 (1999)', 453)
(4.405894101045507, u'FairyTale: A True Story (1997)', 87)
(4.4030583744108425, u"Singin' in the Rain (1952)", 751)
(4.390333274084924, u'Top Hat (1935)', 251)
(4.347757079374581, u'Gone with the Wind (1939)', 1156)

Ich bewerte solide Filme, daher werden solide Filme empfohlen, lol

Recommended Posts

Filmempfehlung mit Co-Filterung von Spark MLlib
[Empfehlung] Inhaltsbasierte Filterung und kooperative Filterung
[Empfehlung] Zusammenfassung der Vor- und Nachteile der inhaltsbasierten und kooperativen Filter- / Implementierungsmethode
Empfehlung der Poesie
Denken Sie an Spezifikationen für die Entwicklung von Co-Filtering-Empfehlungs-Engines
Ich habe versucht, Co-Filtering (Empfehlung) mit Redis und Python zu implementieren