[PYTHON] Spark API cheat sheet

Purpose

I want to make a note of Spark's frequently used API (mainly for myself) so that it can be used quickly even when developing for the first time in a long time. I will summarize the Python version for the time being (I may add the Scala version if I have time)

** This cheat sheet is just a cheat sheet ** (arguments may be omitted), so if you have time, please make sure [Official API documentation (Spark Python API Docs)](http://spark.apache. See org / docs / latest / api / python / index.html).

Spark API cheat sheet (Python)

The following assumes the following

from pyspark import SparkContext
from pyspark.sql import SQLContext

sc = SparkContext()
sqlContext = SQLContext(sc)

RDD

Create RDD (read data)

parallelize

sc.parallelize(collection)Make RDDs from lists and tuples



```py
>>> a = [1, 2, 3, 4, 5]
>>> rdd = sc.parallelize(a)

textFile

sc.textFile(file)Read the file. You can also use wildcards and regular expressions.



```py
>>> rdd = sc.textFile("./words.txt")

wholeTextFiles

sc.wholeTextFiles(dierctory)Enter the entire contents of each file in the directory into one element of the RDD



```py
# $ ls
# a.json b.json c.json
>>> rdd = sc.textWholeFiles("./")

Action

Transformation is executed in order for the first time when Action is executed (delayed execution)

What returns an element

collect

collect()Returns all elements

>>> print(rdd.collect())
[1, 2, 3, 4, 5]

take

take(n)First returns n elements

>>> print(rdd.take(3))
[1, 2, 3]

first

first()Returns the very first element

>>> rdd = sc.parallelize([1, 2, 3])
>>> rdd.first()
1

top

top(n)Returns n elements from the largest

>>> rdd = sc.parallelize([1, 2, 3])
>>> rdd.top(2)
[3, 2]

What returns a (statistic) quantity

count

count()Count and return the number of elements

>>> rdd = sc.parallelize([1, 2, 3])
>>> rdd.count()
3

mean

mean()Returns the average

>>> rdd = sc.parallelize([1, 2, 3])
>>> rdd.mean()
3.0

sum

sum()Returns the total

>>> rdd = sc.parallelize([1, 2, 3])
>>> rdd.sum()
6

variance

variance()Returns the variance

>>> rdd = sc.parallelize([1, 2, 3])
>>> rdd.variance()
0.6666666666666666

stdev

stdev()Returns the standard deviation

>>> rdd = sc.parallelize([1, 2, 3])
>>> rdd.stdev()
0.816496580927726

What to save

saveAsTextFile

saveastextfile(file)Save the file

>>> rdd.saveAsTextFile("./a.txt")

Transformation

Transformation returns a new immutable RDD

filter/map/reduce

filter

filter(f)Returns an rdd containing only elements for which f is true

>>> rdd = sc.parallelize([1, 2, 3])
>>> rdd.filter(lambda x: x % 2 == 0).collect()
[2]

map

map(f)Returns rdd with f acting on all elements

>>> rdd = sc.parallelize([1, 2, 3])
>>> rdd.map(lambda x: x * 2).collect()
[2, 4, 6]

flatMap

flatmap(f)After applying f to all elements, return rdd which expanded the list in the element

>>> rdd = sc.parallelize(["This is a pen", "This is an apple"])
>>> rdd.flatMap(lambda x: x.split()).collect()
['This', 'is', 'a', 'pen', 'This', 'is', 'an', 'apple']

Reduce

reduce(f)Continue to act f on two elements to get one return value

>>> rdd = sc.parallelize([1, 2, 3])
>>> rdd.reduce(lambda x, y: x + y)
6

Operations on pair RDD

Create a pair RDD

Pair RDD is an RDD that has Tuple as an element in Python. Can handle key and value. To make it, use `keyBy``` or use `map``` to return a tuple with 2 elements to the element.

keyBy(PairRDD)

keyby(f)Let f act on the element of ordinary rdd, and return rdd with its return value as key and the original element as value.

>>> rdd = sc.parallelize(["Ken 27 180 83", "Bob 32 170 65", "Meg 29 165 45"])
>>> rdd.keyBy(lambda x: x.split()[0]).collect()
[('Ken', 'Ken 27 180 83'), ('Bob', 'Bob 32 170 65'), ('Meg', 'Meg 29 165 45')]

keys

keysReturns an rdd consisting only of the keys of the pair rdd

>>> rdd = sc.parallelize([("Ken", 2), ("Bob", 3), ("Taka", 1), ("Ken", 3), ("Bob", 2)])
>>> rdd.keys().collect()
['Ken', 'Bob', 'Taka', 'Ken', 'Bob']

values

valuesReturns an rdd consisting only of the vlaue of the pair rdd

>>> rdd = sc.parallelize([("Ken", 2), ("Bob", 3), ("Taka", 1), ("Ken", 3), ("Bob", 2)])
>>> rdd.values().collect()
[2, 3, 1, 3, 2]

flatMapValues

flatmapvalues(f)Apply flatmap to the value of pairrdd to duplicate the key and make it so-called vertical holding

>>> rdd = sc.parallelize([("Ken", "Yumi,Yukiko"), ("Bob", "Meg, Tomomi, Akira"), ("Taka", "Yuki")])
>>> rdd.flatMapValues(lambda x: x.split(","))
[('Ken', 'Yumi'),
 ('Ken', 'Yukiko'),
 ('Bob', 'Meg'),
 ('Bob', ' Tomomi'),
 ('Bob', ' Akira'),
 ('Taka', 'Yuki')]

reduceByKey

reducebykey(f)Group by elements with the same key and apply reduce to value

>>> rdd = sc.parallelize([("Ken", 2), ("Bob", 3), ("Taka", 1), ("Ken", 3), ("Bob", 2)])
>>> rdd.reduceByKey(lambda x, y: x + y).collect()
[('Taka', 1), ('Bob', 5), ('Ken', 5)]

countByKey

countbykey()Count how many values of the same key are and return with dict

>>> rdd = sc.parallelize([("Ken", 2), ("Bob", 3), ("Taka", 1), ("Ken", 3), ("Bob", 2)])
>>> rdd.countByKey()
defaultdict(<type 'int'>, {'Ken': 2, 'Bob': 2, 'Taka': 1})

sortByKey

sortbykeySort pair rdd by key

>>> rdd = sc.parallelize([("cba", 2), ("abc", 3), ("bac", 1), ("bbb", 
>>> rdd.sortByKey().collect()
[('aaa', 2), ('abc', 3), ('bac', 1), ('bbb', 3), ('cba', 2)]

Join operation

leftOuterJoin Left outer join two RDDs and return a pair RDD with a tuple of two elements in value

>>> rdd1 = sc.parallelize([("Ken", 1), ("Bob", 2), ("Meg", 3)])
>>> rdd2 = sc.parallelize([("Ken", 1), ("Kaz", 3)])
>>> rdd1.leftOuterJoin(rdd2).collect()
[('Bob', (2, None)), ('Meg', (3, None)), ('Ken', (1, 1))]

rightOuterJoin Right outer join two RDDs and return a pair RDD with a tuple of two elements in value

>>> rdd1 = sc.parallelize([("Ken", 1), ("Bob", 2), ("Meg", 3)])
>>> rdd2 = sc.parallelize([("Ken", 1), ("Kaz", 3)])
>>> rdd1.rightOuterJoin(rdd2).collect()
[('Ken', (1, 1)), ('Kaz', (3, None))]

fullOuterJoin Full outer join two RDDs and return a pair RDD with a tuple of two elements in value

>>> rdd1 = sc.parallelize([("Ken", 1), ("Bob", 2), ("Meg", 3)])
>>> rdd2 = sc.parallelize([("Ken", 1), ("Kaz", 3)])
>>> rdd1.fullOuterJoin(rdd2).collect()
[('Bob', (2, None)), ('Meg', (3, None)), ('Ken', (1, 1)), ('Kaz', (None, 3))]

Sort operation

sortBy

sortby(f)Sort by the value returned by f

>>> rdd = sc.parallelize([("cba", 2), ("abc", 3), ("bac", 1), ("bbb", 
>>> rdd.sortBy(lambda (x, y): x).collect() #Same as sortByKey

Set operation etc.

intersection

intersection(rdd)Returns an intersection of two rdd

union

union(rdd)Returns the union of two rdd

zip zip(rdd)Returns a pair rdd with each element of the argument rdd as vlaue

>>> rdd = sc.parallelize([("Ken", 2), ("Bob", 3), ("Taka", 1), ("Ken", 3), ("Bob", 2)])
>>> rdd.keys().zip(rdd.values())
[('Ken', 2), ('Bob', 3), ('Taka', 1), ('Ken', 3), ('Bob', 2)]

distinct Returns an RDD that does not contain the same elements

Sampling operation

sample

sample(bool, frac)Returns the sampled rdd. The first argument determines whether the same element can be duplicated.

>>> rdd = sc.parallelize([1, 2, 3, 4, 5])
>>> rdd.sample(True, 0.5).collect()
[1, 5, 5]
>>> rdd.sample(False, 0.5).collect()
[1, 3, 5]

takeSample

takesmaple(bool, size)Returns a list of fixed size samples. The first argument determines whether the same element can be duplicated.

>>> rdd = sc.parallelize([1, 2, 3, 4, 5])
>>> rdd.takeSample(True, 2)
[5, 5]
>>> rdd.takeSample(False, 2)
[3, 5]

debug

toDebugString

todebugstring()Returns the execution plan

print(rdd.flatMap(lambda x: x.split()).map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y).toDebugString())
(1) PythonRDD[190] at RDD at PythonRDD.scala:43 []
 |  MapPartitionsRDD[189] at mapPartitions at PythonRDD.scala:374 []
 |  ShuffledRDD[188] at partitionBy at null:-1 []
 +-(1) PairwiseRDD[187] at reduceByKey at <ipython-input-114-71f5cb742e13>:1 []
    |  PythonRDD[186] at reduceByKey at <ipython-input-114-71f5cb742e13>:1 []
    |  ParallelCollectionRDD[141] at parallelize at PythonRDD.scala:423 []

Persistence

persist

persist()Cache rdd as is (in memory by default). You can set only memory, disk if memory is not possible, disk only, etc. (storagelevelSpecified by)

>>> rdd.persist()

unpersist

unpersist()Solve the persistence of rdd. Used when changing the persistence level.

>>> from pyspark import StorageLevel
>>> rdd.persist()
>>> rdd.unpersist()
>>> rdd.persist(StorageLevel.DISK_ONLY)

Common examples

Will be added at any time

word count

>>> rdd.flatMap(lambda x: x.split())\
       .map(lambda x: (x, 1))\
       .reduceByKey(lambda x, y: x + y)\
       .take(5)

DataFrame This is especially convenient when dealing with structured data.

Create a DataFrame (read data)

read.json

read.json(file)Read data from json



```py
# $ cat a.json
# {"name":"Ken", "age":35}
# {"name":"Bob", "age":30, "weight":80}
# {"name":"Meg", "age":29, "weight":45}
df = sqlContext.read.json("a.json")

Display DataFrame

There is `show``` in addition to collect```, `` take``` which is the same as RDD

show

show(n)Display n lines (n is 20 by default)

>>> df.show()
+---+----+------+
|age|name|weight|
+---+----+------+
| 35| Ken|  null|
| 30| Bob|    80|
| 29| Meg|    45|
+---+----+------+

DataFrame operations

select

select(column)Returns the selected dataframe, passing a string or column object. You can also enumerate columns to get multiple columns or perform calculations.

>>> df.select("age").show()
+---+
|age|
+---+
| 35|
| 30|
| 29|
+---+

#Same for next
>>> df.select(df.age).show() #Pass a Column object
>>> df.select(df["age"]).show() #Pass a Column object
>>> df.select(df.name, df.age).show()
+----+---+
|name|age|
+----+---+
| Ken| 35|
| Bob| 30|
| Meg| 29|
+----+---+
Column object of Dataframe

There are two patterns in Python for accessing the Column object passed by select:

>>> df.age
Column<age>
>>> df["age"]
Column<age>

where/filter

filter(condition)Returns a dataframe consisting of only lines that meet the string criteria.whereIsfilterAlias.

>>> df.where(df.age >=30).show()
+---+----+------+
|age|name|weight|
+---+----+------+
| 35| Ken|  null|
| 30| Bob|    80|
+---+----+------+

sort

sort(column)Returns a dataframe sorted by the specified column

>>> df.sort(df.age)
+---+----+------+
|age|name|weight|
+---+----+------+
| 29| Meg|    45|
| 30| Bob|    80|
| 35| Ken|  null|
+---+----+------+

limit

limit(n)Returns a dataframe limited to only the first n rows

>>> df.limit(1).show()
+---+----+------+
|age|name|weight|
+---+----+------+
| 35| Ken|  null|
+---+----+------+

distinct

distinct()Returns a dataframe consisting only of the distinct result rows

>>> df.distinct().count()
3

join

join(dataframe, on, how)how default is inner

--on: Column or list of columns --how: `" inner "```, "outer" , "left_outer" , "right_outer" , "leftsemi" ` One of

Convert from Dataframe to RDD

Since the DataFrame is built on the RDD, the original RDD can be retrieved.

>>> print(df.rdd.collect())
[Row(age=35, name=u'Ken', weight=None),
 Row(age=30, name=u'Bob', weight=80),
 Row(age=29, name=u'Meg', weight=45)]

To retrieve only a specific column, access the corresponding attribute of the Row object

df.rdd.map(lambda row: (row.age, row.weight)).collect()
[(35, None), (30, 80), (29, 45)]

Save the Dataframe

toJson

tojson()Convert to rdd in the form of json. after thissaveastextfileYou can save it in json format by calling.

>>> df.toJSON().saveAsTextFile("b.json")
>>> df2 = sqlContext.read.json("/b.json")
+---+----+------+
|age|name|weight|
+---+----+------+
| 35| Ken|  null|
| 30| Bob|    80|
| 29| Meg|    45|
+---+----+------+

from now on

Spark Streaming and Mllib related items may be added here.

Recommended Posts

Spark API cheat sheet
Slack API attachments cheat sheet
Curry cheat sheet
SQLite3 cheat sheet
pyenv cheat sheet
conda command cheat sheet
PIL / Pillow cheat sheet
Linux command cheat sheet
ps command cheat sheet
Python3 cheat sheet (basic)
PySpark Cheat Sheet [Python]
Python sort cheat sheet
Go language cheat sheet
tox configuration file cheat sheet
numpy memory reuse cheat sheet
[Python3] Standard input [Cheat sheet]
Data Science Cheat Sheet (Python)
Python Django Tutorial Cheat Sheet
scikit learn algorithm cheat sheet
Apache Beam Cheat Sheet [Python]
Google Test / Mock personal cheat sheet
Continuation Passing Style (CPS) Cheat Sheet
Python cheat sheet (for C ++ experienced)
Python Computation Library Cheat Sheet ~ itertools ~
Curry cheat sheet [Description example list version]
AtCoder cheat sheet in python (for myself)
Blender Python Mesh Data Access Cheat Sheet
Mathematical Optimization Modeler (PuLP) Cheat Sheet (Python)
A brief description of pandas (Cheat Sheet)