[PYTHON] Aim for content similarity with Pyspark

Introduction

I've written code to measure content similarity in pyspark. There were few Japanese documents, so I will make a note of it. I used docker because it was awkward to introduce spark from scratch.

Introducing pyspark with docker

install docker

To introduce docker http://qiita.com/hshimo/items/e24b1fbfbf775ec7c941 I referred to.

Basically Get started with Docker for Mac Just download and install more dmg files.

Using the docker image of spark + jupyter

Clone the docker image of spark + jupyter. I cloned it to https://github.com/busbud/jupyter-docker-stacks/tree/master/all-spark-notebook. As noted in the README

$ docker run -d -p 8888:8888 jupyter/all-spark-notebook -e GRANT_SUDO=yes

Type the above command on terminal. This will bring up a jupyter notebook that can use pyspark on localhost: 8888. (-E GRANT_SUDO = yes is an option to allow jupyter to be used without user privileges.)

When you hit the docker ps command

$ docker ps
CONTAINER ID        IMAGE                        COMMAND                  CREATED             STATUS              PORTS                    NAMES
91fc42290759        jupyter/all-spark-notebook   "tini -- start-not..."   3 hours ago         Up 3 hours          0.0.0.0:8888->8888/tcp   nifty_bartik

You can see that the specified docker is running.

code

The created code is https://github.com/kenchin110100/machine_learning/blob/master/samplePysparkSimilarity.ipynb It is in.

Initialization

First, import the required libraries and initialize pyspark.


# coding: utf-8
"""
Sample for similarity with pyspark
"""
import numpy as np
from pyspark import SQLContext, sql
import pyspark
from pyspark.sql import functions, Row
from pyspark.mllib.linalg import DenseVector

sc = pyspark.SparkContext('local[*]')
sqlContext = sql.SQLContext(sc)

sc is an instance required to use the RDD type of pyspark, sqlContext is an instance required to use the DataFrame type.

Creating sample data

Next, I created sample data to measure the similarity.

#Creating sample data
samples = [['aaa', 'a', 30, 1,2,3,4,5]  + np.random.randn(5).tolist(),
    ['aaa', 'b', 30,2,1,3,4,1] + np.random.randn(5).tolist(),
    ['bbb', 'a', 30,4,5,3,2,4] + np.random.randn(5).tolist(),
    ['bbb', 'b', 30,1,2,4,3,1] + np.random.randn(5).tolist(),
    ['ccc', 'a', 30,4,5,2,1,2] + np.random.randn(5).tolist(),
    ['ccc', 'b', 30,1,2,5,4,1] + np.random.randn(5).tolist(),]

#Creating column names
colnames = [
    'mc', 'mtc', 'area_cd',
    'label1', 'label2', 'label3', 'label4', 'label5',
    'label6', 'label7', 'label8', 'label9', 'label10'
]
colnames1 = [col + '_1' for col in colnames]
colnames2 = [col + '_2' for col in colnames]

#Convert the created sample data to pyspark DataFrame type
df1 = sqlContext.createDataFrame(sc.parallelize(samples), colnames1)
df2 = sqlContext.createDataFrame(sc.parallelize(samples), colnames2)

We consider mc and mtc as unique keys and label1 to label10 as features.

The same sample data is stored in two data frames This is to create a combination for similarity with join. After converting samples to RDD type with sc.parallelize (samples), it is converted to DataFrame type with createDataFrame.

Enumeration of combinations

Then use a DataFrame type join to enumerate the combinations that measure similarity.

joined_df = df1.join(df2, df1.area_cd_1 == df2.area_cd_2).filter(functions.concat(df1.mc_1, df1.mtc_1) != functions.concat(df2.mc_2, df2.mtc_2))

DataFrame type join

df1.join(df2, <conditions>, 'left' or 'inner' or ...)

You can create it with. In the code created this time, by performing a filter after join, I try not to measure the similarity with myself.

functions.concat(df1.mc_1, df1.mtc_1)

Then, by combining the two keys of mc and mtc that you want to make unique, they are treated as one unique key.

Calculation of similarity

We will calculate the similarity using the DataFrame created so far.

Function definition

First, define the function.

def match_sim(row1 ,row2):
    keys = row1.asDict().keys()
    total = len(keys)
    count = 0
    for key in keys:
        if row1[key] == row2[key]:
            count += 1
    return float(count)/total

def cosine_sim(vec1 ,vec2):
    dot = abs(vec1.dot(vec2))
    n1 = vec1.norm(None)
    n2 = vec1.norm(None)
    return float(dot/(n1*n2))

match_sim is a function for measuring the similarity of categorical variables. Pass the Row type of pyspark. Returns 1 if they match, 0 if they don't match, and returns the value divided by the compared features.

cosine_sim is a function for calculating cosine similarity Pass the DenseVector type of pyspark.mllib.

Use this function to calculate the similarity for each row.

Calculation of similarity

joined_rdd = joined_df.rdd.map(lambda x: (
    Row(mc_1=x.mc_1, mtc_1=x.mtc_1, mc_2=x.mc_2, mtc_2=x.mtc_2),
    Row(label1=x.label1_1, label2=x.label2_1, label3=x.label3_1, label4=x.label4_1, label5=x.label5_1),
    DenseVector([x.label6_1,x.label7_1,x.label8_1,x.label9_1,x.label10_1]),
    Row(label1=x.label1_2, label2=x.label2_2, label3=x.label3_2, label4=x.label4_2, label5=x.label5_2),
    DenseVector([x.label6_2,x.label7_2,x.label8_2,x.label9_2,x.label10_2])
                                         )) \
.map(lambda x: (x[0], match_sim(x[1], x[3]), cosine_sim(x[2], x[4]))) \
.map(lambda x: (x[0].mc_1, x[0].mtc_1, x[0].mc_2, x[0].mtc_2, x[1], x[2]))

First, convert the DataFrame type joined_df created earlier to the rdd type and map it (1st line). Save 5 data types for each combination. Row (mc_1 = x.mc_1 ...) is a Row (2nd row) for storing a unique key for similarity. Row (label1 = x.label1_1 ...) is the Row for storing categorical variables (3rd row) DenseVector (x.label6_1, ...) is a Vector for storing continuous variables (4th line) The 5th and 6th lines store the categorical variables and continuous variables of the other line for similarity.

Map further to the RDD that saved the 5 types of data types created in this way (line 8). With x [0] as is, the match similarity is calculated for x [1] and x [3], and the cosine similarity is calculated for x [2] and x [4]. Finally, format it so that it can be passed to the DataFrame type again (line 9).

Result output

The similarity table created in this way is as follows.

sqlContext.createDataFrame(joined_rdd, ['tar_mc', 'tar_mtc', 'res_mc', 'res_mtc', 'match_sim', 'cosine_sim']).show()

+------+-------+------+-------+---------+--------------------+
|tar_mc|tar_mtc|res_mc|res_mtc|match_sim|          cosine_sim|
+------+-------+------+-------+---------+--------------------+
|   aaa|      a|   aaa|      b|      0.4|  0.2979433262317515|
|   aaa|      a|   bbb|      a|      0.2|  0.2161103600613806|
|   aaa|      a|   bbb|      b|      0.4|  0.6933162039799152|
|   aaa|      a|   ccc|      a|      0.0| 0.34941331375143353|
|   aaa|      a|   ccc|      b|      0.6|  0.5354750033557132|
|   aaa|      b|   aaa|      a|      0.4| 0.19428899651078324|
|   aaa|      b|   bbb|      a|      0.2| 0.10702152405150611|
|   aaa|      b|   bbb|      b|      0.2|  0.4033681950723296|
|   aaa|      b|   ccc|      a|      0.0| 0.20097172584128625|
|   aaa|      b|   ccc|      b|      0.4|  0.6861144738544892|
|   bbb|      a|   aaa|      a|      0.2|  0.3590385377694502|
|   bbb|      a|   aaa|      b|      0.2| 0.27266040008605663|
|   bbb|      a|   bbb|      b|      0.0|  1.1313716028957246|
|   bbb|      a|   ccc|      a|      0.4|0.009321106239696326|
|   bbb|      a|   ccc|      b|      0.0|  1.0017633803368193|
|   bbb|      b|   aaa|      a|      0.4|  0.2176828683879606|
|   bbb|      b|   aaa|      b|      0.2|   0.194213765887726|
|   bbb|      b|   bbb|      a|      0.0| 0.21381230488831227|
|   bbb|      b|   ccc|      a|      0.0| 0.21074015342537053|
|   bbb|      b|   ccc|      b|      0.6| 0.34536679942567616|
+------+-------+------+-------+---------+--------------------+
only showing top 20 rows

For each key, the similarity and cosine similarity by category match are calculated.

At the end

This time, I used docker's jupyter-spark image to create a sample to measure the similarity between data. There should be a more concise way to write it (such as using MLlib's ColumnSimilarity), but for a variety of reasons, I wrote this roundabout code this time around. I'm a beginner of docker and spark, so I'd like to practice a little more from now on.

Recommended Posts

Aim for content similarity with Pyspark
Collaborative filtering with PySpark
[For beginners] Quantify the similarity of sentences with TF-IDF
Precautions when operating with string for TmeStampType of PySpark
Auto-complete YAML content with Python
PySpark life starting with Docker