PySpark Cheet Sheet [Python]

Einführung

Dieser Artikel basiert auf dem Inhalt der PySpark 3.0.1-Dokumentation. Wenn ich das Verhalten von Funktionen kenne, die leicht aufgerufen werden können, denke ich, dass es möglich sein wird, eine Implementierungsrichtlinie schneller zu formulieren.

Introduction

JupyterLab PySpark Kernel

Erstellen Sie zunächst eine Umgebung zum Ausführen von PySpark mit GCPs Dataproc.

gcloud dataproc clusters create <cluster name> --enable-component-gateway --region <region> --zone <zone> \
--master-machine-type n1-standard-2 --master-boot-disk-size 500 --num-workers 2 \
--worker-machine-type n1-standard-2 --worker-boot-disk-size 500 --image-version 1.4-debian10 --optional-components ANACONDA,JUPYTER \
--scopes https://www.googleapis.com/auth/cloud-platform,https://www.googleapis.com/auth/devstorage.full_control --project <project id>

Wir haben --enable-component-gateway festgelegt, damit wir JupyterLab auf der Cluster-Benutzeroberfläche auswählen können. Verwenden Sie für die Daten die [Titanic-Daten] von kaggle (https://www.kaggle.com/c/titanic).

import library

import pandas as pd
import numpy as np
import sys
import time

from google.cloud import storage as gcs
from io import BytesIO
from fs_gcsfs import GCSFS
from fs import open_fs
import gcsfs

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql import Row ,functions as F
from pyspark.sql import SQLContext
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.functions import *
from pyspark.sql.window import Window
sys.path
>>
['/opt/conda/anaconda/lib/python36.zip',
 '/opt/conda/anaconda/lib/python3.6',
 '/opt/conda/anaconda/lib/python3.6/lib-dynload',
 '',
 '/opt/conda/anaconda/lib/python3.6/site-packages',
 '/usr/lib/spark/python',
 '/opt/conda/anaconda/lib/python3.6/site-packages/IPython/extensions',
 '/root/.ipython']
gcsfs = GCSFS(bucket_name = <bucket_name>)
gcsfs.tree()
>>
|-- .ipynb_checkpoints
|   -- error (resource '/.ipynb_checkpoints' not found)
|-- google-cloud-dataproc-metainfo
|   -- error (resource '/google-cloud-dataproc-metainfo' not found)
|-- notebooks
|   -- jupyter
|       -- Untitled.ipynb
-- titanic_data.csv

load CSV data (as Pandas DataFrame) from Cloud Storage

bucket_name = "<bucket_name>"
file_name = "titanic_data.csv"
project_id = "<project_id>"
# with Pandas
titanic_data = pd.read_csv("<Cloud Storage Path>")
# with gcsfs
fs = gcsfs.GCSFileSystem(project = project_id)
with fs.open('{}/titanic_data.csv'.format(bucket_name)) as file:
    titanic_data = pd.read_csv(file)
# with gcs
client = gcs.Client()
bucket = client.get_bucket(bucket_name)

blob = gcs.Blob(file_name, bucket)
content = blob.download_as_string()
titanic_data = pd.read_csv(BytesIO(content))

write PySPark DataFrame (as Pandas DataFrame) to Cloud Storage

titanic_data.toPandas().to_csv('gs://pyspark_output/output.csv', header = True)

Spark DataFrame

# spark = SparkSession.builder.appName("titanic_data_").getOrCreate()

schema = StructType([
  StructField("PassengerId", IntegerType(), True),
  StructField("Survived", IntegerType(), True),
  StructField("Pclass", IntegerType(), True),
  StructField("Name", StringType(), True),
  StructField("Sex", StringType(), True),
  StructField("Age", DoubleType(), True),
  StructField("SibSp", IntegerType(), True),
  StructField("Parch", IntegerType(), True),
  StructField("Ticket", StringType(), True),
  StructField("Fare", DoubleType(), True),
  StructField("Cabin", StringType(), True),
  StructField("Embarked", StringType(), True),  
])

titanic_data = spark.read.format("com.databricks.spark.csv").options(header = "true").load("gs://{}/titanic_data.csv".format(bucket_name), schema = schema)
# titanic_data = spark.read.format("com.databricks.spark.csv").option("header", "true").load("gs://{}/titanic_data.csv".format(bucket_name), schema = schema)
titanic_data.show(5)
>>
+-----------+--------+------+--------------------+------+---+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex|Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+---+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male| 22|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female| 38|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female| 26|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female| 35|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male| 35|    0|    0|          373450|   8.05| null|       S|
+-----------+--------+------+--------------------+------+---+-----+-----+----------------+-------+-----+--------+
only showing top 5 rows

Löschen Sie den fehlenden Wert von Embarked.

titanic_data = titanic_data.filter(F.col('Embarked').isNotNull())
titanic_data.filter(F.col('Embarked').isNull()).count()
>> 0
titanic_data
>> DataFrame[PassengerId: int, Survived: int, Pclass: int, Name: string, Sex: string, Age: double, SibSp: int, Parch: int, Ticket: string, Fare: double, Cabin: string, Embarked: string, Embarked_labeled: string, Age_cat: string]
titanic_data.count()
>> 889
titanic_data.printSchema()
>>
root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)
 |-- Embarked_labeled: string (nullable = true)
 |-- Age_cat: string (nullable = true)
titanic_data.dtypes
>>
[('PassengerId', 'int'),
 ('Survived', 'int'),
 ('Pclass', 'int'),
 ('Name', 'string'),
 ('Sex', 'string'),
 ('Age', 'double'),
 ('SibSp', 'int'),
 ('Parch', 'int'),
 ('Ticket', 'string'),
 ('Fare', 'double'),
 ('Cabin', 'string'),
 ('Embarked', 'string'),
 ('Embarked_labeled', 'string'),
 ('Age_cat', 'string')]

DataFrame ⇔ RDD

# DataFrame -> RDD
titanic_data.rdd

# DataFrame <- RDD
spark.createDataFrame(titanic_data.rdd)

SparkSQL

titanic_data.registerTempTable('titanic_data_table')

spark.sql("select PassengerID, Embarked from titanic_data_table where Embarked = 'S' ").toPandas().iloc[0:5]
>>
  PassengerID  Embarked
0     1     S
1     3     S
2     4     S
3     5     S
4     7     S

spark.sql("select age, \
case when age <= 12 then 'C' \
when age between 13 and 19 then 'T' \
when age between 20 and 25 then '1' \
when age between 26 and 34 then '2' \
when age between 35 and 49 then '3' \
when age >= 50 then '4' end as age_cat \
from titanic_data_table").toPandas().iloc[0:5]
>>
     age  age_cat
0     22     1
1     38     3
2     26     2
3     35     3
4     35     3

Functions

Max / Min / Avg (or Mean) / Std / Sum / Count

titanic_data.agg({'Fare': 'max'}).collect()
>> [Row(max(Fare)=512.3292)]

Join

join(other, on=None, how=None)[source] Joins with another DataFrame, using the given join expression.

<Parameters> other – Right side of the join

on – a string for the join column name, a list of column names, a join expression (Column), or a list of Columns. If on is a string or a list of strings indicating the name of the join column(s), the column(s) must exist on both sides, and this performs an equi-join.

how – str, default inner. Must be one of: inner, cross, outer, full, fullouter, full_outer, left, leftouter, left_outer, right, rightouter, right_outer, semi, leftsemi, left_semi, anti, leftanti and left_anti.

random_number_pddf = pd.DataFrame(np.random.randint(100, size = 889)[:, np.newaxis], columns = {'random_number'})
data_for_join_ = pd.concat([titanic_data_.PassengerId, random_number_pddf], axis = 1)
data_for_join = spark.createDataFrame(data_for_join_)

# how = 'inner'
data_join = titanic_data.join(data_for_join, titanic_data.PassengerId == data_for_join.PassengerId, how = 'inner').drop(data_for_join.PassengerId)
data_join.printSchema()
>> 
root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)
 |-- Embarked_labeled: string (nullable = true)
 |-- Age_cat: string (nullable = true)
 |-- random_number: double (nullable = true)

groupBy

time_ = time.time()

titanic_data.groupby('Embarked', 'Survived').count().sort('count', ascending = False).show(5)

time_ = time.time() - time_
>>
+--------+--------+-----+
|Embarked|Survived|count|
+--------+--------+-----+
|       S|       0|  427|
|       S|       1|  217|
|       C|       1|   93|
|       C|       0|   75|
|       Q|       0|   47|
+--------+--------+-----+
only showing top 5 rows

np.round(time_, 3)
>> 2.426
time_ = time.time()

spark.sql("select Embarked, Survived, \
count(*) as per_Embarked_cat \
from titanic_data_table \
group by Embarked, Survived order by per_Embarked_cat desc").show(5)

time_ = time.time() - time_
>>
+--------+--------+----------------+
|Embarked|Survived|per_Embarked_cat|
+--------+--------+----------------+
|       S|       0|             427|
|       S|       1|             217|
|       C|       1|              93|
|       C|       0|              75|
|       Q|       0|              47|
+--------+--------+----------------+
only showing top 5 rows

np.round(time_, 3)
>> 1.794

Drop

titanic_data.select('Pclass', 'Age', 'Name').drop('Name').show(5)
>>
+------+----+
|Pclass| Age|
+------+----+
|     3|22.0|
|     1|38.0|
|     3|26.0|
|     1|35.0|
|     3|35.0|
+------+----+
only showing top 5 rows

Duplicate

titanic_data.select('Pclass', 'Age', 'Name').show(10)
>>
+------+----+--------------------+
|Pclass| Age|                Name|
+------+----+--------------------+
|     3|22.0|Braund, Mr. Owen ...|
|     1|38.0|Cumings, Mrs. Joh...|
|     3|26.0|Heikkinen, Miss. ...|
|     1|35.0|Futrelle, Mrs. Ja...|
|     3|35.0|Allen, Mr. Willia...|
|     3|null|    Moran, Mr. James|
|     1|54.0|McCarthy, Mr. Tim...|
|     3| 2.0|Palsson, Master. ...|
|     3|27.0|Johnson, Mrs. Osc...|
|     2|14.0|Nasser, Mrs. Nich...|
+------+----+--------------------+
only showing top 10 rows

titanic_data.select('Name').distinct().count()
>> 889
titanic_data.select('Name').count()
>> 889


titanic_data.dropDuplicates(['Name']).select('Pclass', 'Age', 'Name').show(10)
>>
+------+----+--------------------+
|Pclass| Age|                Name|
+------+----+--------------------+
|     2|40.0|Watt, Mrs. James...|
|     1|36.0|Young, Miss. Mari...|
|     1|null|Parr, Mr. William...|
|     3|19.0|Soholt, Mr. Peter...|
|     3|31.0|Goldsmith, Mrs. F...|
|     3|42.0|    Dimic, Mr. Jovan|
|     1|48.0|Harper, Mr. Henry...|
|     1|38.0|Reuchlin, Jonkhee...|
|     2|18.0|Fahlstrom, Mr. Ar...|
|     2|42.0|Hosono, Mr. Masabumi|
+------+----+--------------------+
only showing top 10 rows

titanic_data.dropDuplicates(['Name']).select('Pclass', 'Age', 'Name').count()
>> 889

Explode / Split / Regexp Replace

explode: Returns a new row for each element in the given array or map. Uses the default column name col for elements in the array and key and value for elements in the map unless specified otherwise.

split: Splits str around matches of the given pattern.

regexp_replace: Replace all substrings of the specified string value that match regexp with rep.

titanic_data.select('name').show(5)
>>
+--------------------+
|                name|
+--------------------+
|Braund, Mr. Owen ...|
|Cumings, Mrs. Joh...|
|Heikkinen, Miss. ...|
|Futrelle, Mrs. Ja...|
|Allen, Mr. Willia...|
+--------------------+
only showing top 5 rows


titanic_data.withColumn('Name', explode(split(regexp_replace(F.col('name'), "(^\[)|(\]$)", ""), ", "))).select('name').show(5)
>>
+--------------------+
|                name|
+--------------------+
|              Braund|
|     Mr. Owen Harris|
|             Cumings|
|Mrs. John Bradley...|
|           Heikkinen|
+--------------------+
only showing top 5 rows

Window-function

Das Folgende befasst sich mit der Fensterfunktion.

from pyspark.sql.window import Window
from pyspark.sql.functions import *

Rank

window_ = Window.partitionBy('Embarked').orderBy('Fare') 

titanic_data.withColumn('rank', rank().over(window_)).select('Embarked', 'Fare', 'rank').show(5)
>>
+--------+------+----+
|Embarked|  Fare|rank|
+--------+------+----+
|       Q|  6.75|   1|
|       Q|  6.75|   1|
|       Q|6.8583|   3|
|       Q|  6.95|   4|
|       Q|7.6292|   5|
+--------+------+----+
only showing top 5 rows

Percent Rank

window_ = Window.partitionBy('Embarked').orderBy('Fare')

titanic_data.withColumn('percent_rank', percent_rank().over(window_)).select('Embarked', 'Fare', F.round('percent_rank', 2).alias('percent_rank')).show(5)
>>
+--------+------+------------+
|Embarked|  Fare|percent_rank|
+--------+------+------------+
|       Q|  6.75|         0.0|
|       Q|  6.75|         0.0|
|       Q|6.8583|        0.03|
|       Q|  6.95|        0.04|
|       Q|7.6292|        0.05|
+--------+------+------------+
only showing top 5 rows

Dense Rank

window_ = Window.partitionBy('Embarked').orderBy('Fare')

titanic_data.withColumn('dense_rank', dense_rank().over(window_)).select('Embarked', 'Fare', 'dense_rank').show(5)
>>
+--------+------+----------+
|Embarked|  Fare|dense_rank|
+--------+------+----------+
|       Q|  6.75|         1|
|       Q|  6.75|         1|
|       Q|6.8583|         2|
|       Q|  6.95|         3|
|       Q|7.6292|         4|
+--------+------+----------+
only showing top 5 rows

Row Number

window_ = Window.partitionBy('Embarked').orderBy('Fare') 

titanic_data.withColumn('row_number', row_number().over(window_)).select('Embarked', 'Fare', 'row_number').show(5)
>>
+--------+------+----------+
|Embarked|  Fare|row_number|
+--------+------+----------+
|       Q|  6.75|         1|
|       Q|  6.75|         2|
|       Q|6.8583|         3|
|       Q|  6.95|         4|
|       Q|7.6292|         5|
+--------+------+----------+
only showing top 5 rows

Cume Dist

window_ = Window.partitionBy('Embarked').orderBy('Fare')

titanic_data.withColumn('cumulative_dist', cume_dist().over(window_)).select('Embarked', 'Fare', F.round('cumulative_dist', 2).alias('cumulative_dist')).show(10)
>>
+--------+------+---------------+
|Embarked|  Fare|cumulative_dist|
+--------+------+---------------+
|       Q|  6.75|           0.03|
|       Q|  6.75|           0.03|
|       Q|6.8583|           0.04|
|       Q|  6.95|           0.05|
|       Q|7.6292|           0.06|
|       Q| 7.725|           0.08|
|       Q|7.7292|           0.09|
|       Q|7.7333|           0.14|
|       Q|7.7333|           0.14|
|       Q|7.7333|           0.14|
+--------+------+---------------+
only showing top 10 rows

Lead

window_ = Window.partitionBy('Embarked').orderBy('Fare')

titanic_data.withColumn('lead', lead('Fare', 2).over(window_)).select('Embarked', 'Fare', 'lead').show(5)
>>
+--------+------+------+
|Embarked|  Fare|  lead|
+--------+------+------+
|       Q|  6.75|6.8583|
|       Q|  6.75|  6.95|
|       Q|6.8583|7.6292|
|       Q|  6.95| 7.725|
|       Q|7.6292|7.7292|
+--------+------+------+
only showing top 5 rows

Lag

window_ = Window.partitionBy('Embarked').orderBy('Fare')

titanic_data.withColumn('lag', lag('Fare', 2).over(window_)).select('Embarked', 'Fare', 'lag').show(5)
>>
+--------+------+------+
|Embarked|  Fare|   lag|
+--------+------+------+
|       Q|  6.75|  null|
|       Q|  6.75|  null|
|       Q|6.8583|  6.75|
|       Q|  6.95|  6.75|
|       Q|7.6292|6.8583|
+--------+------+------+
only showing top 5 rows

Aggregate

window_ = Window.partitionBy('Embarked').orderBy('Fare')
window_agg = Window.partitionBy('Embarked')

titanic_data.withColumn('row', row_number().over(window_))\
.withColumn('avg', avg(F.col('Fare')).over(window_agg))\
.withColumn('max', max(F.col('Fare')).over(window_agg))\
.select('Embarked', 'row', 'avg', 'max').show(5)
>>
+--------+---+------------------+----+
|Embarked|row|               avg| max|
+--------+---+------------------+----+
|       Q|  1|13.276029870129872|90.0|
|       Q|  2|13.276029870129872|90.0|
|       Q|  3|13.276029870129872|90.0|
|       Q|  4|13.276029870129872|90.0|
|       Q|  5|13.276029870129872|90.0|
+--------+---+------------------+----+
only showing top 5 rows
titanic_data.withColumn('row', row_number().over(window_))\
.withColumn('avg', avg(F.col('Fare')).over(window_agg).alias('avg'))\
.withColumn('max', max(F.col('Fare')).over(window_agg)).where(F.col('row') == 1)\
.select('Embarked', F.round(F.col('avg'), 2).alias('Fare_avg'), F.round(F.col('max'), 2).alias('Fare_max'))\
.sort('Fare_avg', ascending = False).show()
>>
+--------+--------+--------+
|Embarked|Fare_avg|Fare_max|
+--------+--------+--------+
|       C|   59.95|  512.33|
|       S|   27.08|   263.0|
|       Q|   13.28|    90.0|
+--------+--------+--------+

UserDefinedFunction

UserDefinedFunction【PySpark】

# from pyspark.sql.functions import UserDefinedFunction
# from pyspark.sql import SQLContext, Row
# from pyspark.sql.types import *

def LabelEncoder(x):
    if x == 'S':
        x_ = 0
    elif x == 'C':
        x_ = 1
    elif x == 'Q':
        x_ = 2
    
    return x_

udf_label_Encoder = UserDefinedFunction(LabelEncoder)
titanic_data.filter('Age > 12').withColumn('Embarked_labeld', udf_label_Encoder(F.col('Embarked'))).select('PassengerId', 'Embarked_labeld').show(5)
# titanic_data.filter('Age > 12').withColumn('Embarked_labeld', udf_label_Encoder('Embarked')).select('PassengerId', 'Embarked_labeld').show(5)
>>
+-----------+--------------+
|PassengerId|Embarked_label|
+-----------+--------------+
|          1|             0|
|          2|             1|
|          3|             0|
|          4|             0|
|          5|             0|
+-----------+--------------+
only showing top 5 rows
titanic_data.select('PassengerId', udf_label_Encoder('Embarked').alias('Embarked_label')).filter('PassengerId >= 2').show(5)
>>
+-----------+--------------+
|PassengerId|Embarked_label|
+-----------+--------------+
|          2|             1|
|          3|             0|
|          4|             0|
|          5|             0|
|          6|             2|
+-----------+--------------+
only showing top 5 rows
# from pyspark.sql.functions import when

titanic_data.withColumn('Embarked', when(F.col('Embarked') == 'S', '0').when(F.col('Embarked') == 'C', '1').otherwise('3'))\
.withColumnRenamed('Embarked', 'Embarked_label').select('PassengerId', 'Embarked_label').filter('PassengerId >= 2').show(5)
>>
+-----------+--------------+
|PassengerId|Embarked_label|
+-----------+--------------+
|          2|             1|
|          3|             0|
|          4|             0|
|          5|             0|
|          6|             2|
+-----------+--------------+
only showing top 5 rows

UserDefinedFunction【SparkSQL】

Durch Ändern der Zeitzone können Sie PySpark-Funktionen auf dem neu erstellten DataFrame von SaprkSQL ausführen.

spark.conf.set("spark.sql.session.timeZone", "Asia/Tokyo")

titanic_data.registerTempTable('titanic_data_table')

spark.udf.register('LabelEncoder_', LabelEncoder)

spark.sql('''select PassengerId, LabelEncoder_(Embarked) as Embarked_labeled from titanic_data_table''').show(5)
>>
+-----------+--------------+
|PassengerId|Embarked_label|
+-----------+--------------+
|          1|             0|
|          2|             1|
|          3|             0|
|          4|             0|
|          5|             0|
+-----------+--------------+
only showing top 5 rows

Zusammenfassung

PySpark bietet eine Fülle von Funktionen. Ich habe versucht, einige davon zusammenzufassen.

Referenz-URL

PySpark 3.0.1 documentation

Recommended Posts

PySpark Cheet Sheet [Python]
Python3 Spickzettel (Basic)
Python-Spickzettel
[Python3] Standardeingabe [Cheet Sheet]
Data Science Cheet Sheet (Python)
Python Django Tutorial Cheet Sheet
Apache Beam Cheet Sheet [Python]
Python-Spickzettel (für C ++ erfahren)
Curry Spickzettel
SQLite3 Spickzettel
pyenv Spickzettel
AtCoder Spickzettel in Python (für mich)
Blender Python Mesh Datenzugriffsprüfblatt
Cheet Sheet (Python) des Mathematical Optimization Modeler (PuLP)
conda Befehl Spickzettel
PIL / Kissen Spickzettel
ps Befehl Spickzettel
Spark API Spickzettel
[Aktualisierung] Python Syntax Spickzettel für Java Shop
Python-basierte PDF-Spickzettel
Tox Einstellungsdatei Spickzettel
numpy Speicher wiederverwenden Spickzettel
Slack API Anhänge Spickzettel
Scikit lernen Algorithmus Spickzettel
Persönlicher Spickzettel von Google Test / Mock
CPS-Spickzettel (Continuous Delivery Style)
Python
R-Code-kompatibles Blatt für Python-Benutzer