[PYTHON] Mesurer la similitude du contenu avec Pyspark

introduction

J'ai écrit le code pour mesurer la similitude du contenu dans pyspark. Il y avait peu de documents japonais, je vais donc en prendre note. J'ai utilisé docker parce que c'était pénible d'introduire une étincelle à partir de zéro.

Présentation de pyspark avec docker

installation de docker

Pour présenter docker http://qiita.com/hshimo/items/e24b1fbfbf775ec7c941 Je l'ai mentionné.

Fondamentalement Get started with Docker for Mac Téléchargez et installez simplement plus de fichiers dmg.

Utilisation de l'image Docker Spark + Jupyter

Clonez l'image docker de spark + jupyter. Je l'ai cloné sur https://github.com/busbud/jupyter-docker-stacks/tree/master/all-spark-notebook. Comme indiqué dans le README

$ docker run -d -p 8888:8888 jupyter/all-spark-notebook -e GRANT_SUDO=yes

Tapez la commande ci-dessus sur le terminal. Cela fera apparaître un notebook jupyter qui peut utiliser pyspark sur localhost: 8888. (-E GRANT_SUDO = yes est une option qui vous permet d'utiliser jupyter sans privilèges d'utilisateur.)

Lorsque vous appuyez sur la commande docker ps

$ docker ps
CONTAINER ID        IMAGE                        COMMAND                  CREATED             STATUS              PORTS                    NAMES
91fc42290759        jupyter/all-spark-notebook   "tini -- start-not..."   3 hours ago         Up 3 hours          0.0.0.0:8888->8888/tcp   nifty_bartik

Vous pouvez voir que le docker spécifié est en cours d'exécution.

code

Le code créé est https://github.com/kenchin110100/machine_learning/blob/master/samplePysparkSimilarity.ipynb C'est dedans.

Initialisation

Tout d'abord, importez les bibliothèques requises et initialisez pyspark.


# coding: utf-8
"""
Exemple de similitude avec pyspark
"""
import numpy as np
from pyspark import SQLContext, sql
import pyspark
from pyspark.sql import functions, Row
from pyspark.mllib.linalg import DenseVector

sc = pyspark.SparkContext('local[*]')
sqlContext = sql.SQLContext(sc)

sc est une instance requise pour utiliser le type RDD de pyspark, sqlContext est une instance requise pour utiliser le type DataFrame.

Créer des exemples de données

Ensuite, j'ai créé des exemples de données pour mesurer la similitude.

#Créer des exemples de données
samples = [['aaa', 'a', 30, 1,2,3,4,5]  + np.random.randn(5).tolist(),
    ['aaa', 'b', 30,2,1,3,4,1] + np.random.randn(5).tolist(),
    ['bbb', 'a', 30,4,5,3,2,4] + np.random.randn(5).tolist(),
    ['bbb', 'b', 30,1,2,4,3,1] + np.random.randn(5).tolist(),
    ['ccc', 'a', 30,4,5,2,1,2] + np.random.randn(5).tolist(),
    ['ccc', 'b', 30,1,2,5,4,1] + np.random.randn(5).tolist(),]

#Créer des noms de colonnes
colnames = [
    'mc', 'mtc', 'area_cd',
    'label1', 'label2', 'label3', 'label4', 'label5',
    'label6', 'label7', 'label8', 'label9', 'label10'
]
colnames1 = [col + '_1' for col in colnames]
colnames2 = [col + '_2' for col in colnames]

#Convertir les exemples de données créés en type pyspark DataFrame
df1 = sqlContext.createDataFrame(sc.parallelize(samples), colnames1)
df2 = sqlContext.createDataFrame(sc.parallelize(samples), colnames2)

Nous considérons mc et mtc comme des clés uniques et label1 à label10 comme des identités.

Le même échantillon de données est stocké dans deux blocs de données Il s'agit de créer une combinaison de similitude avec jointure. Après avoir converti les échantillons en type RDD avec sc.parallelize (samples), il est converti en type DataFrame avec createDataFrame.

Énumération des combinaisons

Utilisez ensuite une jointure de type DataFrame pour énumérer les combinaisons qui mesurent la similitude.

joined_df = df1.join(df2, df1.area_cd_1 == df2.area_cd_2).filter(functions.concat(df1.mc_1, df1.mtc_1) != functions.concat(df2.mc_2, df2.mtc_2))

Jointure de type DataFrame

df1.join(df2, <conditions>, 'left' or 'inner' or ...)

Vous pouvez le créer avec. Dans le code créé cette fois, en effectuant un filtre après la jointure, J'essaye de ne pas mesurer la similitude avec moi-même.

functions.concat(df1.mc_1, df1.mtc_1)

Ensuite, en combinant les deux clés mc et mtc que vous souhaitez rendre uniques, elles sont traitées comme une clé unique.

Calcul de la similitude

Nous calculerons la similitude en utilisant le DataFrame créé jusqu'à présent.

Définition des fonctions

Tout d'abord, définissez la fonction.

def match_sim(row1 ,row2):
    keys = row1.asDict().keys()
    total = len(keys)
    count = 0
    for key in keys:
        if row1[key] == row2[key]:
            count += 1
    return float(count)/total

def cosine_sim(vec1 ,vec2):
    dot = abs(vec1.dot(vec2))
    n1 = vec1.norm(None)
    n2 = vec1.norm(None)
    return float(dot/(n1*n2))

match_sim est une fonction pour mesurer la similitude des variables catégorielles. Passez le type Row de pyspark. Renvoie 1 s'ils correspondent, 0 s'ils ne correspondent pas et renvoie la valeur divisée par les nombres premiers comparés.

cosine_sim est une fonction pour calculer la similitude cosinus Passez le type DenseVector de pyspark.mllib.

Utilisez cette fonction pour calculer la similitude pour chaque ligne.

Calcul de la similitude

joined_rdd = joined_df.rdd.map(lambda x: (
    Row(mc_1=x.mc_1, mtc_1=x.mtc_1, mc_2=x.mc_2, mtc_2=x.mtc_2),
    Row(label1=x.label1_1, label2=x.label2_1, label3=x.label3_1, label4=x.label4_1, label5=x.label5_1),
    DenseVector([x.label6_1,x.label7_1,x.label8_1,x.label9_1,x.label10_1]),
    Row(label1=x.label1_2, label2=x.label2_2, label3=x.label3_2, label4=x.label4_2, label5=x.label5_2),
    DenseVector([x.label6_2,x.label7_2,x.label8_2,x.label9_2,x.label10_2])
                                         )) \
.map(lambda x: (x[0], match_sim(x[1], x[3]), cosine_sim(x[2], x[4]))) \
.map(lambda x: (x[0].mc_1, x[0].mtc_1, x[0].mc_2, x[0].mtc_2, x[1], x[2]))

Commencez par convertir le type DataFrame joint_df créé précédemment en type rdd et mappez-le (1ère ligne). Enregistrez 5 types de données pour chaque combinaison. Row (mc_1 = x.mc_1 ...) est une ligne (2e ligne) pour stocker une clé unique pour la similitude. Row (label1 = x.label1_1 ...) est la ligne de stockage des variables catégorielles (3e ligne) DenseVector (x.label6_1, ...) est un vecteur pour stocker des variables continues (4ème ligne) Les 5ème et 6ème lignes stockent les variables catégorielles et les variables continues de l'autre ligne pour la similitude.

Mappez davantage le RDD qui enregistre les 5 types de types de données ainsi créés (ligne 8). Avec x [0] tel quel, la similarité de correspondance est calculée pour x [1] et x [3], et la similitude cosinus est calculée pour x [2] et x [4]. Enfin, formatez-le pour qu'il puisse être à nouveau passé au type DataFrame (ligne 9).

Sortie de résultat

La table de similitude ainsi créée est la suivante.

sqlContext.createDataFrame(joined_rdd, ['tar_mc', 'tar_mtc', 'res_mc', 'res_mtc', 'match_sim', 'cosine_sim']).show()

+------+-------+------+-------+---------+--------------------+
|tar_mc|tar_mtc|res_mc|res_mtc|match_sim|          cosine_sim|
+------+-------+------+-------+---------+--------------------+
|   aaa|      a|   aaa|      b|      0.4|  0.2979433262317515|
|   aaa|      a|   bbb|      a|      0.2|  0.2161103600613806|
|   aaa|      a|   bbb|      b|      0.4|  0.6933162039799152|
|   aaa|      a|   ccc|      a|      0.0| 0.34941331375143353|
|   aaa|      a|   ccc|      b|      0.6|  0.5354750033557132|
|   aaa|      b|   aaa|      a|      0.4| 0.19428899651078324|
|   aaa|      b|   bbb|      a|      0.2| 0.10702152405150611|
|   aaa|      b|   bbb|      b|      0.2|  0.4033681950723296|
|   aaa|      b|   ccc|      a|      0.0| 0.20097172584128625|
|   aaa|      b|   ccc|      b|      0.4|  0.6861144738544892|
|   bbb|      a|   aaa|      a|      0.2|  0.3590385377694502|
|   bbb|      a|   aaa|      b|      0.2| 0.27266040008605663|
|   bbb|      a|   bbb|      b|      0.0|  1.1313716028957246|
|   bbb|      a|   ccc|      a|      0.4|0.009321106239696326|
|   bbb|      a|   ccc|      b|      0.0|  1.0017633803368193|
|   bbb|      b|   aaa|      a|      0.4|  0.2176828683879606|
|   bbb|      b|   aaa|      b|      0.2|   0.194213765887726|
|   bbb|      b|   bbb|      a|      0.0| 0.21381230488831227|
|   bbb|      b|   ccc|      a|      0.0| 0.21074015342537053|
|   bbb|      b|   ccc|      b|      0.6| 0.34536679942567616|
+------+-------+------+-------+---------+--------------------+
only showing top 20 rows

Pour chaque clé, la similarité et la similarité cosinus en fonction du degré de correspondance de la catégorie sont calculées.

À la fin

Cette fois, j'ai utilisé l'image jupyter-spark de docker pour créer un échantillon afin de mesurer la similitude entre les données. Il devrait y avoir une manière plus concise de l'écrire (comme l'utilisation de ColumnSimilarity de MLlib), mais pour diverses raisons, j'ai écrit ce code détourné cette fois-ci. Je suis un débutant de docker et d'étincelle, donc j'aimerais m'entraîner un peu plus à partir de maintenant.

Recommended Posts

Mesurer la similitude du contenu avec Pyspark
Co-filtrage avec PySpark
[Pour les débutants] Quantifier la similitude des phrases avec TF-IDF
Précautions lors du calcul avec une chaîne pour TmeStampType de PySpark
Compléter automatiquement le contenu YAML avec Python
La vie PySpark à partir de Docker