[PYTHON] PySpark data manipulation

This article summarizes the features and data manipulation of PySpark.

About PySpark

Features of PySpark (Spark)

Partitioning and Bucketing

An important concept of ʻApache Hive` in the operation of PySpark

For more information on Partitioning and Bucketing, please visit [https://data-flair.training/blogs/hive-partitioning-vs-bucketing/).

Checking the usage status of computational resources

If your data is slow, you may want to use Ganglia to see how your compute resources are being used.

In particular, network traffic (= data transfer volume) is low and processing often takes time. In this case, you may be able to solve it by taking the following measures.

PySpark code snippet

The following variables are assumed to have been generated.

Caution

  1. The content displayed by > df.show () may not always be correct because it is intended to capture the image.
  2. You may be suddenly using a variable other than the ones listed above.
  3. The path starts at s3: // because it's supposed to run on AWS.
  4. We plan to add and modify sequentially.
  5. ** If you make a mistake in the syntax or writing, please leave a comment. ** **

import

The following items are mainly imported when using spark.

# from pyspark.sql.functions import *In some cases,
#I like to specify the namespace of the function with F because it is easier to understand.
#However, F violates PEP8. .. ..
from pyspark.sql import functions as F
from pyspark.sql.types import FloatType, TimestampType, StringType
from pyspark.sql.window import Window

Execution environment settings

spark.conf.set("spark.sql.session.timeZone", "Asia/Tokyo")

initialize spark It is not necessary on JupyterHub of ʻEMR, but when executing with python script, Instance initialization of spark` is required.

# spark initialization
spark = SparkSession.builder.appName("{your app name here}").getOrCreate()

Data reading

df = spark.read.parquet(path)
# dt=2020-01-01/Read all the files below
df = spark.read.parquet("s3://some-bucket/data/dt=2020-01-01/*.parquet")

# dt=2020-01-01/From dt=2020-01-31/Read all the files below
df = spark.read.parquet("s3://some-bucket/data/dt=2020-01-*/*.parquet")
#reads the file in the path included in the paths list
df = spark.read.parquet(*paths)
#Load partition in addition to columns
df = spark.read.option("basePath", parent_path).parquet(*paths)

By saving the result of lazy evaluation in memory, high-speed processing becomes possible. It is better to cache () frequently used data and use it especially after the processing.

#On-memory cache
df = df.cache()
#Or
#On-memory cache by default, cache destination can be changed to storage etc. with optional arguments
df = df.persist()
df.printSchema()

root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)

Data output

#csv (header is not given in this case)
df.write.csv(path)

# parquet
df.write.parquet(path)
#In the case of csv, it is not given unless the output setting of header is set.
df.write.mode("overwrite").option("header", "True").csv(path)
# or
df.write.mode("overwrite").csv(path, header=True)
#In case of parquet, it is output by default even if header is not specified.
df.write.parquet(path)
# gzip with csv
df.write.csv(path, compression="gzip")

#snappy with parquet (should be snappy compressed by default?)
df.write.option("compression", "snappy").parquet(path)

In the case of the following example, it will be output to the folder /dt={dt_col}/count={count_col}/{file}.parquet.

df.repartition("dt", "count").write.partitionBy("dt", "count").parqeut(path)

If you perform coalesce after multiple processes, the processing speed will slow down, so if possible, it is better to output the file normally and then coalesce the read again.

#May be slow after multiple processes
df.coalesce(1).write.csv(path, header=True)

#Recommended if possible (output → read → output)
df.write.parquet(path)
alt_df = spark.read.parquet(path)
alt_df.coalesce(1).write.csv(path, header=True)
df.repartition(20).write.parquet(path)
# write.mode()Arguments that can be used in'overwrite', 'append', 'ignore', 'error', 'errorifexists'
#I often use overwrite
#Normally, an error will occur if the file exists in the output destination folder.
df.write.parquet(path)

#If you want to overwrite
df.write.mode("overwrite").parquet(path)

#If you want to add to the current folder
df.write.mode("append").parquet(path)

Data frame generation

This is a method of creating a data frame programmatically, not from reading a file.

#Create a single column data frame
id_list = ["A001", "A002", "B001"]
df = spark.createDataFrame(id_list, StringType()).toDF("id")
#The elements inside are tuple,Finally specify the name of the column
df = spark.createDataFrame([
    ("a", None, None),
    ("a", "code1", None),
    ("a", "code2", "name2"),
], ["id", "code", "name"])

> df.show()
+---+-----+-----+
| id| code| name|
+---+-----+-----+
|  a| null| null|
|  a|code1| null|
|  a|code2|name2|
+---+-----+-----+

# =======================
#When creating using rdd once
rdd = sc.parallelize(
    [
        (0, "A", 223, "201603", "PORT"), 
        (0, "A", 22, "201602", "PORT"), 
        (0, "A", 422, "201601", "DOCK"), 
        (1, "B", 3213, "201602", "DOCK"), 
        (1, "B", 3213, "201601", "PORT"), 
        (2, "C", 2321, "201601", "DOCK")
    ]
)
df_data = spark.createDataFrame(rdd, ["id","type", "cost", "date", "ship"])

> df.show()
+---+----+----+------+----+
| id|type|cost|  date|ship|
+---+----+----+------+----+
|  0|   A| 223|201603|PORT|
|  0|   A|  22|201602|PORT|
|  0|   A| 422|201601|DOCK|
|  1|   B|3213|201602|DOCK|
|  1|   B|3213|201601|PORT|
|  2|   C|2321|201601|DOCK|
+---+----+----+------+----+

Add column (withColumn ())

In PySpark, analysis is often performed using "the process of adding a new column".

# new_col_Create a new column called name and give it a literal value (= constant) of 1
df = df.withColumn("new_col_name", F.lit(1))
#Give the read file path
df = df.withColumn("file_path", F.input_file_name())

#Get the file name from the read file path
df = df.withColumn("file_name", F.split(col("file_path"), "/").getItem({int:Last index value}))
#Specified by a character string
df = df.withColumn("total_count", F.col("total_count").cast("double"))

#Specified by PySpark types
df = df.withColumn("value", F.lit("1").cast(StringType()))
#If you want to add a column of values according to the conditions
# F.when(condtion, value).otherwise(else_value)
df = df.withColumn("is_even", F.when(F.col("number") % 2 == 0, 1).otherwise(0))

#In case of multiple conditions
df = df.withColumn("search_result", F.when( (F.col("id") % 2 == 0) & (F.col("room") % 2 == 0), 1).otherwise(0))

df = df.withColumn("is_touched", F.col("value").isNotNull())
df = df.withColumn("replaced_id", F.regexp_replace(F.col("id"), "A", "C"))
# date time -> epoch time
df = df.withColumn("epochtime", F.unix_timestamp(F.col("timestamp"), "yyyy-MM-dd HH:mm:ssZ"))

# epoch time -> date time
# 1555259647 -> 2019-04-14 16:34:07
df = df.withColumn("datetime", F.to_timestamp(df["epochtime"]))

# datetime -> string
# 2019-04-14 16:34:07 -> 2019-04-14
string_format = "yyyy-MM-dd"
df = df.withColumn("dt", F.date_format(F.col("datetime"), string_format))

# epoch time:A string of numbers of about 10 digits. Seconds since January 1, 1970
df = df.withColumn("hour", F.hour(F.col("epochtime")))
df = df.withColumn("hour", F.hour(F.col("timestamp")))

#Truncate datetime to the specified time width
df = df.withColumn("hour", F.date_trunc("hour", "datetime"))
df = df.withColumn("week", F.date_trunc("week", "datetime"))

There are many other functions available in Takutan withColumn. Please also see the reference site.

Combine data frames

The method to join two DataFrames horizontally / vertically is join () / union ().

#Specify columns to join with on
df = left_df.join(right_df, on="id")

# data-For different columns for each frame
df = left_df.join(right_df, left_df.id_1 == right_df.id_2)

#You can also specify the combination method
# how:= inner, left, right, left_semi, left_anti, cross, outer, full, left_outer, right_outer
df = left_df.join(right_df, on="id", how="inner")
df = left_df.join(right_df, on=["id", "dt"])
df = left_df.join(F.broadcast(right_df), on="id")
df = upper_df.union(bottom_df)

Column operation (rename, drop, select)

It is often used when reading csv without a column name.

#If there is no column name`_c0`From`_c{n}`Is given the column name
df = df.withColumnRenamed("_c0", "id")
df = df.select("id")
df = df.select("id").distinct()
# count()Often used in combination with
#Example: Unique number of a certain id
print(df.select("id").distinct().count())
df = df.drop("id")
# simple
df = df.dropna()
# using subset
df = df.na.drop(subset=["lat", "lon"])
#Simple case
df = df.select("id").select(F.collect_list("id"))
id_list = df.first()[0]

> id_list => ["A001", "A002", "B001"]

#Can also be used in combination with groupBy
df = df.groupBy("id").agg(F.collect_set("code"), F.collect_list("name"))

> 
+---+-----------------+------------------+
| id|collect_set(code)|collect_list(name)|
+---+-----------------+------------------+
|  a|   [code1, code2]|           [name2]|
+---+-----------------+------------------+
#Get the value of the data frame directly
df = df.groupBy().avg()
avg_attribute = df.collect()[0]

> print(avg_attribute["avg({col_name})"])
{averaged_value}

filter

You can use F.col () to apply filtering to specific columns

# using spark.function
df = df.filter(F.col("id") == "A001")

# pandas-like
df = df.filter(df['id'] == "A001")
df = df.filter(df.id == "A001")

However, if possible, you should create a spark dataframe from date_list and join it.

df = df.filter(F.col("dt").isin(date_list))

orderBy

Sorting is not suitable for distributed processing, so it is better not to do so much.

#Single column only
df = df.orderBy("count", ascending=False)

#Multi-condition sort
df = df.orderBy(F.col("id").asc(), F.col("cound").desc())

groupBy (aggregate)

# count()
df = df.groupBy("id").count()

# multiple
# alias()The column name is changed by the function
#Example: user aggregation
df = df.groupBy("id").agg(
  F.count(F.lit(1)).alias("count"),
  F.mean(F.col("diff")).alias("diff_mean"),
  F.stddev(F.col("diff")).alias("diff_stddev"),
  F.min(F.col("diff")).alias("diff_min"),
  F.max(F.col("diff")).alias("diff_max")
)

> df.show()
(abridgement)

# =======================
#Example: Aggregation by user date and time
df = df.groupBy("id", "dt").agg(
  F.count(F.lit(1)).alias("count")
  )
  
> df.show()
+---+-----------+------+
| id|         dt| count|
+---+-----------+------+
|  a| 2020/01/01|     7|
|  a| 2020/01/02|     5|
|  a| 2020/01/03|     4|
+---+-----------+------+

# ===========================
#Example: Aggregation by user date / time / location
df = df.groupBy("id", "dt", "location_id").agg(
  F.count(F.lit(1)).alias("count")
  )

> df.show()
+---+-----------+------------+------+
| id|         dt| location_id| count|
+---+-----------+------------+------+
|  a| 2020/01/01|           A|     2|
|  a| 2020/01/01|           B|     3|
|  a| 2020/01/01|           C|     2|
:   :           :            :      :
+---+-----------+------------+------+
#Example: Number of user uniques by date
df = df.groupBy("dt").agg(countDistinct("id").alias("id_count"))

> df.show()
+-----------+---------+
|         dt| id_count|
+-----------+---------+
| 2020/01/01|        7|
| 2020/01/02|        5|
| 2020/01/03|        4|
+-----------+---------+

# ===============================
#Example: Number of days each user has been in contact at least once
df = df.groupBy("id").agg(countDistinct("dt").alias("dt_count"))

> df.show()
+---+---------+
| id| dt_count|
+---+---------+
|  a|       10|
|  b|       15|
|  c|        4|
+---+---------+
group_columns = ["id", "dt"]
df = ad_touched_visit_df.groupBy(*group_columns).count()

window function

w = Window().orderBy(F.col("id"))
df = df.withColumn("row_num", F.row_number().over(w))
#Add the data from the previous row as a column
w = Window.partitionBy("id").orderBy("timestamp")
df = df.withColumn("prev_timestamp", F.lag(df["timestamp"]).over(w))

loop processing

It is strongly deprecated as it is incompatible with distributed environments. It is better to use it only when it must be for.

for row in df.rdd.collect():
  do_some_with(row['id'])

Reference site

Recommended Posts

PySpark data manipulation
Data manipulation with Pandas!
Data Manipulation in Python-Try Pandas_plyr
[Translation] scikit-learn 0.18 Tutorial Text data manipulation
Pandas Data Manipulation Column Join, Column Swap, Column Rename
Data handling
[Python] Chapter 04-02 Various data structures (list manipulation)
[Python] Chapter 04-07 Various data structures (dictionary manipulation)
Let's do MySQL data manipulation with Python