[PYTHON] Spark Dataframe Sample Code Collection

Introduction: What is Spark Dataframe?

From Spark Ver 1.3, a function called Spark Dataframe has been added. The features are as follows.

In other words, it is simple code and faster processing than writing in RDD map or filter. Assuming that data pre-processing is done by RDD, it is better to load it into Dataframe immediately with maji. Since the memos of Dataframe are scattered, I will put back the Sample code in the memorandum.

In addition, it should be noted

Load Sample Log

Use Access Log as the subject. Access Log (csv) used in Technical Review Company's Book, to csv file Click here for Nao Rin. The content of csv is the following Log with 3 information of date, User_ID, Campaign_ID

click.at	user.id	campaign.id
2015/4/27 20:40	144012	Campaign077
2015/4/27 0:27	24485	Campaign063
2015/4/27 0:28	24485	Campaign063
2015/4/27 0:33	24485	Campaign038

Read csv and make it RDD. Delete the header in the first row and read the first column as a datetime Object.

import json, os, datetime, collections, commands
from pyspark.sql import SQLContext, Row
from pyspark.sql.types import *

if not os.path.exists("./click_data_sample.csv"):
    print "csv file not found at master node, will download and copy to HDFS"
    commands.getoutput("wget -q http://image.gihyo.co.jp/assets/files/book/2015/978-4-7741-7631-4/download/click_data_sample.csv")
    commands.getoutput("hadoop fs -copyFromLocal -f ./click_data_sample.csv /user/hadoop/")

whole_raw_log = sc.textFile("/user/hadoop/click_data_sample.csv")
header = whole_raw_log.first()
whole_log = whole_raw_log.filter(lambda x:x !=header).map(lambda line: line.split(","))\
            .map(lambda line: [datetime.datetime.strptime(line[0].replace('"', ''), '%Y-%m-%d %H:%M:%S'), int(line[1]), line[2].replace('"', '')])

whole_log.take(3)
#[[datetime.datetime(2015, 4, 27, 20, 40, 40), 144012, u'Campaign077'],
# [datetime.datetime(2015, 4, 27, 0, 27, 55), 24485, u'Campaign063'],
# [datetime.datetime(2015, 4, 27, 0, 28, 13), 24485, u'Campaign063']]

How to create a Dataframe

** Created from RDD **

Dataframe can be created with sqlContext.createDataFrame (my_rdd, my_schema) by specifying the column name and each Type (TimestampType, ʻIntegerType, StringType`, etc.) if there is an original RDD. I will. See here for Schema definitions.

printSchema (), dtypes is for Schema information,count ()is for the number of lines, andshow (n)is for the first n records.

fields = [StructField("access_time", TimestampType(), True), StructField("userID", IntegerType(), True), StructField("campaignID", StringType(), True)]
schema = StructType(fields)

whole_log_df = sqlContext.createDataFrame(whole_log, schema)
print whole_log_df.count()
print whole_log_df.printSchema()
print whole_log_df.dtypes
print whole_log_df.show(5)

#327430
#root
# |-- access_time: timestamp (nullable = true)
# |-- userID: integer (nullable = true)
# |-- campaignID: string (nullable = true)
#
#[('access_time', 'timestamp'), ('userID', 'int'), ('campaignID', 'string')]
#
#+--------------------+------+-----------+
#|         access_time|userID| campaignID|
#+--------------------+------+-----------+
#|2015-04-27 20:40:...|144012|Campaign077|
#|2015-04-27 00:27:...| 24485|Campaign063|
#|2015-04-27 00:28:...| 24485|Campaign063|
#|2015-04-27 00:33:...| 24485|Campaign038|
#|2015-04-27 01:00:...| 24485|Campaign063|
** Created directly from csv file **

To convert the data read from csv into a Dataframe as it is, use spark-csv which is one of Spark Package. It's easier to use databricks / spark-csv). Unless otherwise specified, all are read as strings, but if you specify ʻinfer Schema`, it will be a good analogy.

whole_log_df_2 = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").load("/user/hadoop/click_data_sample.csv")
print whole_log_df_2.printSchema()
print whole_log_df_2.show(5)

#root
# |-- click.at: string (nullable = true)
# |-- user.id: string (nullable = true)
# |-- campaign.id: string (nullable = true)
#
#+-------------------+-------+-----------+
#|           click.at|user.id|campaign.id|
#+-------------------+-------+-----------+
#|2015-04-27 20:40:40| 144012|Campaign077|
#|2015-04-27 00:27:55|  24485|Campaign063|
#|2015-04-27 00:28:13|  24485|Campaign063|
#|2015-04-27 00:33:42|  24485|Campaign038|
#|2015-04-27 01:00:04|  24485|Campaign063|

whole_log_df_3 = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load("/user/hadoop/click_data_sample.csv")
print whole_log_df_3.printSchema()

#root
# |-- click.at: timestamp (nullable = true)
# |-- user.id: integer (nullable = true)
# |-- campaign.id: string (nullable = true)

By the way, it is troublesome to have . in the column name, so you can rename it with withColumnRenamed (you can create another renamed Dataframe).

whole_log_df_4 = whole_log_df_3.withColumnRenamed("click.at", "access_time")\
                 .withColumnRenamed("user.id", "userID")\
                 .withColumnRenamed("campaign.id", "campaignID")
print whole_log_df_4.printSchema()

#root
# |-- access_time: timestamp (nullable = true)
# |-- userID: integer (nullable = true)
# |-- campaignID: string (nullable = true)
** Created directly from json **

Use sqlContext.read.json to convert the data read from json file into Dataframe as it is. Treat each line of file as 1 json object, if there is a Key that does not exist, null will be entered.

# test_json.json contains following 3 lines, last line doesn't have "campaignID" key
#
#{"access_time": "2015-04-27 20:40:40", "userID": "24485", "campaignID": "Campaign063"}
#{"access_time": "2015-04-27 00:27:55", "userID": "24485", "campaignID": "Campaign038"}
#{"access_time": "2015-04-27 00:27:55", "userID": "24485"}

df_json = sqlContext.read.json("/user/hadoop/test_json.json")
df_json.printSchema()
df_json.show(5)

#root
# |-- access_time: string (nullable = true)
# |-- campaignID: string (nullable = true)
# |-- userID: string (nullable = true)
#
#+-------------------+-----------+------+
#|        access_time| campaignID|userID|
#+-------------------+-----------+------+
#|2015-04-27 20:40:40|Campaign063| 24485|
#|2015-04-27 00:27:55|Campaign038| 24485|
#|2015-04-27 00:27:55|       null| 24485|
#+-------------------+-----------+------+
** Created directly from parquet **

Use sqlContext.read.parquet to convert the data read from the parquet file into a Dataframe as it is. If you specify the Folder where the parquet file is located, the parquet files under that Folder will be read in a batch.

sqlContext.read.parquet("/user/hadoop/parquet_folder/")

Query with SQL statement

This is a sample that queries the Dataframe with SQL statements. If you give SQL Table name to Dataframe with registerTempTable, you can refer to it as SQL Table name. The return value of sqlContext.sql (SQL statement) is also a Dataframe.

It is possible to describe Sub Query, but be aware that if you do not add Alias to the Sub Query side, a Syntax error will occur for some reason.

#Simple SQL query

whole_log_df.registerTempTable("whole_log_table")

print sqlContext.sql(" SELECT * FROM whole_log_table where campaignID == 'Campaign047' ").count()
#18081
print sqlContext.sql(" SELECT * FROM whole_log_table where campaignID == 'Campaign047' ").show(5)
#+--------------------+------+-----------+
#|         access_time|userID| campaignID|
#+--------------------+------+-----------+
#|2015-04-27 05:26:...| 14151|Campaign047|
#|2015-04-27 05:26:...| 14151|Campaign047|
#|2015-04-27 05:26:...| 14151|Campaign047|
#|2015-04-27 05:27:...| 14151|Campaign047|
#|2015-04-27 05:28:...| 14151|Campaign047|
#+--------------------+------+-----------+


#When putting variables in SQL statements
for count in range(1, 3):
    print "Campaign00" + str(count)
    print sqlContext.sql("SELECT count(*) as access_num FROM whole_log_table where campaignID == 'Campaign00" + str(count) + "'").show()

#Campaign001
#+----------+
#|access_num|
#+----------+
#|      2407|
#+----------+
#
#Campaign002
#+----------+
#|access_num|
#+----------+
#|      1674|
#+----------+

#For Sub Query:
print sqlContext.sql("SELECT count(*) as first_count FROM (SELECT userID, min(access_time) as first_access_date FROM whole_log_table GROUP BY userID) subquery_alias WHERE first_access_date < '2015-04-28'").show(5)
#+------------+
#|first_count |
#+------------+
#|       20480|
#+------------+

Conditional search with ** filter **, ** select **

This is a simple search function for Dataframe. The SQL statement above is similar in function to Query, but filter and select are positioned as simple search functions. filter extracts the rows that meet the conditions, and select extracts the columns. Note that the grammar is slightly different from the RDD filter.

#Sample for filter
print whole_log_df.filter(whole_log_df["access_time"] < "2015-04-28").count()
#41434
print whole_log_df.filter(whole_log_df["access_time"] > "2015-05-01").show(3)
#+--------------------+------+-----------+
#|         access_time|userID| campaignID|
#+--------------------+------+-----------+
#|2015-05-01 22:11:...|114157|Campaign002|
#|2015-05-01 23:36:...| 93708|Campaign055|
#|2015-05-01 22:51:...| 57798|Campaign046|
#+--------------------+------+-----------+

#Sample for select
print whole_log_df.select("access_time", "userID").show(3)
#+--------------------+------+
#|         access_time|userID|
#+--------------------+------+
#|2015-04-27 20:40:...|144012|
#|2015-04-27 00:27:...| 24485|
#|2015-04-27 00:28:...| 24485|
#+--------------------+------+

Aggregate by ** groupBy **

groupBy provides functionality similar to RDD's reduceByKey, but groupBy is the method here. By calling .sql.html # pyspark.sql.GroupedData) after that, various aggregation functions can be realized. Typical examples are ʻaggandcount`.

Aggregate by ** groupBy ** → ** count **

Executes groupBy with campaignID as the Key, and counts the number of Records with count (). If you enumerate multiple Keys in groupBy, the combination will be groupBy as a key.

print whole_log_df.groupBy("campaignID").count().sort("count", ascending=False).show(5)
#+-----------+-----+
#| campaignID|count|
#+-----------+-----+
#|Campaign116|22193|
#|Campaign027|19206|
#|Campaign047|18081|
#|Campaign107|13295|
#|Campaign131| 9068|
#+-----------+-----+

print whole_log_df.groupBy("campaignID", "userID").count().sort("count", ascending=False).show(5)
#+-----------+------+-----+
#| campaignID|userID|count|
#+-----------+------+-----+
#|Campaign047| 30292|  633|
#|Campaign086|107624|  623|
#|Campaign047|121150|  517|
#|Campaign086| 22975|  491|
#|Campaign122| 90714|  431|
#+-----------+------+-----+
Aggregate by ** groupBy ** → ** agg **

It is possible to execute GroupBy with ʻuserIDas the Key and calculate the average and maximum / minimum of the aggregated results. Returns the result of executing thevalue function (min, sum, ʻave etc) on the key column with ʻagg ({key: value}). Since the return value is a Dataframe, you can further narrow down the rows with .filter ()`.

print whole_log_df.groupBy("userID").agg({"access_time": "min"}).show(3)
#+------+--------------------+
#|userID|    min(access_time)|
#+------+--------------------+
#|  4831|2015-04-27 22:49:...|
#| 48631|2015-04-27 22:15:...|
#|143031|2015-04-27 21:52:...|
#+------+--------------------+

print whole_log_df.groupBy("userID").agg({"access_time": "min"}).filter("min(access_time) < '2015-04-28'").count()
#20480
Vertical / horizontal conversion with ** groupBy ** → ** pivot **

Pivot is a new feature from Spark v1.6 Provides similar functionality to SQL Pivot -spark.html). In the case of Pivot of Sample code, the vertical and horizontal changes as follows.

You must always call groupBy ("column that remains vertical"). Pivot ("column that you want to convert from vertical to horizontal"). Sum ("column of aggregated values") and three methods in the chain.

agged_df = whole_log_df.groupBy("userID", "campaignID").count()
print agged_df.show(3)

#+------+-----------+-----+
#|userID| campaignID|count|
#+------+-----------+-----+
#|155812|Campaign107|    4|
#|103339|Campaign027|    1|
#|169114|Campaign112|    1|
#+------+-----------+-----+

#A Cell with no value will be null
pivot_df = agged_df.groupBy("userID").pivot("campaignID").sum("count")
print pivot_df.printSchema()

#root
# |-- userID: integer (nullable = true)
# |-- Campaign001: long (nullable = true)
# |-- Campaign002: long (nullable = true)
# ..
# |-- Campaign133: long (nullable = true)

#When you want to fill a Cell with no value with 0
pivot_df2 = agged_df.groupBy("userID").pivot("campaignID").sum("count").fillna(0)

Add columns in UDF

UDF can be used in Spark Dataframe, I think the main use is to add columns. Since the Dataframe is basically Immutable, you cannot change the contents of the column, and you will create another Dataframe with the column added.

from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import DoubleType

def add_day_column(access_time):
    return int(access_time.strftime("%Y%m%d"))
    
my_udf = UserDefinedFunction(add_day_column, IntegerType())
print whole_log_df.withColumn("access_day", my_udf("access_time")).show(5)

#+--------------------+------+-----------+----------+
#|         access_time|userID| campaignID|access_day|
#+--------------------+------+-----------+----------+
#|2015-04-27 20:40:...|144012|Campaign077|  20150427|
#|2015-04-27 00:27:...| 24485|Campaign063|  20150427|
#|2015-04-27 00:28:...| 24485|Campaign063|  20150427|
#|2015-04-27 00:33:...| 24485|Campaign038|  20150427|
#|2015-04-27 01:00:...| 24485|Campaign063|  20150427|
#+--------------------+------+-----------+----------+

The UDF notation can also be written using the lambda function.

my_udf2 = UserDefinedFunction(lambda x: x + 5, IntegerType())
print whole_log_df.withColumn("userID_2", my_udf2("userID")).show(5)

#+--------------------+------+-----------+--------+
#|         access_time|userID| campaignID|userID_2|
#+--------------------+------+-----------+--------+
#|2015-04-27 20:40:...|144012|Campaign077|  144017|
#|2015-04-27 00:27:...| 24485|Campaign063|   24490|
#|2015-04-27 00:28:...| 24485|Campaign063|   24490|
#|2015-04-27 00:33:...| 24485|Campaign038|   24490|
#|2015-04-27 01:00:...| 24485|Campaign063|   24490|
#+--------------------+------+-----------+--------+

Conversely, use df.drop () to create a Dataframe that wants to remove a particular column.

print whole_log_df.drop("userID").show(3)

#+--------------------+-----------+
#|         access_time| campaignID|
#+--------------------+-----------+
#|2015-04-27 20:40:...|Campaign077|
#|2015-04-27 00:27:...|Campaign063|
#|2015-04-27 00:28:...|Campaign063|
#+--------------------+-----------+

Join two Dataframes with Join

It is also possible to join two Dataframes. Here, consider the case where only the Log of Heavy User (User with 100 or more Access times) is extracted from the entire Log.

First, the User ID of the User who has 100 or more Access times and the number of Access are aggregated by `.groupBy ("userID").

heavy_user_df1 = whole_log_df.groupBy("userID").count()
heavy_user_df2 = heavy_user_df1.filter(heavy_user_df1 ["count"] >= 100)

print heavy_user_df2 .printSchema()
print heavy_user_df2 .show(3)
print heavy_user_df2 .count()

#root
# |-- userID: integer (nullable = true)
# |-- count: long (nullable = false)
#
#+------+-----+
#|userID|count|
#+------+-----+
#| 84231|  134|
#| 13431|  128|
#|144432|  113|
#+------+-----+
#
#177

If you call the join method in the original Dataframe (which will be Left) and write the join condition with the join partner (which will be Right), you can join the Dataframe like a SQL join.

You should be able to select ʻinner, ʻouter, left_outer, rignt_outer, etc. as the join format, but other than ʻinner does not work as intended (and is processed as ʻouter). For the time being, I am trying to outer join with ʻinner and then delete unnecessary Columns with drop`. Please refer to Official Page for detailed options etc..

By the following join process, we were able to retrieve the Log of 38,729 rows corresponding to Users (177 people) who have 100 or more accesses (the total Log is about 320,000 rows).

joinded_df = whole_log_df.join(heavy_user_df2, whole_log_df["userID"] == heavy_user_df2["userID"], "inner").drop(heavy_user_df2["userID"]).drop("count")
print joinded_df.printSchema()
print joinded_df.show(3)
print joinded_df.count()

#root
# |-- access_time: timestamp (nullable = true)
# |-- campaignID: string (nullable = true)
# |-- userID: integer (nullable = true)

#None
#+--------------------+-----------+------+
#|         access_time| campaignID|userID|
#+--------------------+-----------+------+
#|2015-04-27 02:07:...|Campaign086| 13431|
#|2015-04-28 00:07:...|Campaign086| 13431|
#|2015-04-29 06:01:...|Campaign047| 13431|
#+--------------------+-----------+------+
#
#38729

Extract columns from Dataframe

print whole_log_df.columns
#['access_time', 'userID', 'campaignID']

print whole_log_df.select("userID").map(lambda x: x[0]).collect()[:5]
#[144012, 24485, 24485, 24485, 24485]

print whole_log_df.select("userID").distinct().map(lambda x:x[0]).collect()[:5]
#[4831, 48631, 143031, 39631, 80831]

Return from Dataframe to RDD / List

There are two main ways to convert a Dataframe back to RDD.

#convert to rdd by ".map"
print whole_log_df.groupBy("campaignID").count().map(lambda x: [x[0], x[1]]).take(5)
#[[u'Campaign033', 786], [u'Campaign034', 3867], [u'Campaign035', 963], [u'Campaign036', 1267], [u'Campaign037', 1010]]

# rdd -> normal list can be done with "collect".
print whole_log_df.groupBy("campaignID").count().map(lambda x: [x[0], x[1]]).collect()[:5]
#[[u'Campaign033', 786], [u'Campaign034', 3867], [u'Campaign035', 963], [u'Campaign036', 1267], [u'Campaign037', 1010]]

#convert to rdd by ".rdd" will return "Row" object
print whole_log_df.groupBy("campaignID").rdd.take(3)
#[Row(campaignID=u'Campaign033', count=786), Row(campaignID=u'Campaign034', count=3867), Row(campaignID=u'Campaign035', count=963)]

#`.asDict()` will convert to Key-Value RDD from Row object
print whole_log_df.groupBy("campaignID").rdd.map(lambda x:x.asDict()).take(3)
#[{'count': 786, 'campaignID': u'Campaign033'}, {'count': 3867, 'campaignID': u'Campaign034'}, {'count': 963, 'campaignID': u'Campaign035'}]

Export from Dataframe to Parquet file

If you export the Dataframe to a file in Parquet format, you can export it to a file while retaining the schema information. If the directory of the S3 bucket to be exported already exists, writing will fail. Specify a directory name that does not yet exist.

#write to parquet filed
whole_log_df.select("access_time", "userID").write.parquet("s3n://my_S3_bucket/parquet_export") 

#reload from parquet filed
reload_df = sqlContext.read.parquet("s3n://my_S3_bucket/parquet_export") 
print reload_df.printSchema()

Recommended Posts

Spark Dataframe Sample Code Collection
[Note] CAD query sample code
[Python] Sample code for Python grammar
Python parallel / parallel processing sample code summary
paiza Code Girl Collection #Gull this
Extract zip code information using spark