[PYTHON] Spark API cheat sheet


I want to make a note of Spark's frequently used API (mainly for myself) so that it can be used quickly even when developing for the first time in a long time. I will summarize the Python version for the time being (I may add the Scala version if I have time)

** This cheat sheet is just a cheat sheet ** (arguments may be omitted), so if you have time, please make sure [Official API documentation (Spark Python API Docs)](http://spark.apache. See org / docs / latest / api / python / index.html).

Spark API cheat sheet (Python)

The following assumes the following

from pyspark import SparkContext
from pyspark.sql import SQLContext

sc = SparkContext()
sqlContext = SQLContext(sc)


Create RDD (read data)


sc.parallelize(collection)Make RDDs from lists and tuples

>>> a = [1, 2, 3, 4, 5]
>>> rdd = sc.parallelize(a)


sc.textFile(file)Read the file. You can also use wildcards and regular expressions.

>>> rdd = sc.textFile("./words.txt")


sc.wholeTextFiles(dierctory)Enter the entire contents of each file in the directory into one element of the RDD

# $ ls
# a.json b.json c.json
>>> rdd = sc.textWholeFiles("./")


Transformation is executed in order for the first time when Action is executed (delayed execution)

What returns an element


collect()Returns all elements

>>> print(rdd.collect())
[1, 2, 3, 4, 5]


take(n)First returns n elements

>>> print(rdd.take(3))
[1, 2, 3]


first()Returns the very first element

>>> rdd = sc.parallelize([1, 2, 3])
>>> rdd.first()


top(n)Returns n elements from the largest

>>> rdd = sc.parallelize([1, 2, 3])
>>> rdd.top(2)
[3, 2]

What returns a (statistic) quantity


count()Count and return the number of elements

>>> rdd = sc.parallelize([1, 2, 3])
>>> rdd.count()


mean()Returns the average

>>> rdd = sc.parallelize([1, 2, 3])
>>> rdd.mean()


sum()Returns the total

>>> rdd = sc.parallelize([1, 2, 3])
>>> rdd.sum()


variance()Returns the variance

>>> rdd = sc.parallelize([1, 2, 3])
>>> rdd.variance()


stdev()Returns the standard deviation

>>> rdd = sc.parallelize([1, 2, 3])
>>> rdd.stdev()

What to save


saveastextfile(file)Save the file

>>> rdd.saveAsTextFile("./a.txt")


Transformation returns a new immutable RDD



filter(f)Returns an rdd containing only elements for which f is true

>>> rdd = sc.parallelize([1, 2, 3])
>>> rdd.filter(lambda x: x % 2 == 0).collect()


map(f)Returns rdd with f acting on all elements

>>> rdd = sc.parallelize([1, 2, 3])
>>> rdd.map(lambda x: x * 2).collect()
[2, 4, 6]


flatmap(f)After applying f to all elements, return rdd which expanded the list in the element

>>> rdd = sc.parallelize(["This is a pen", "This is an apple"])
>>> rdd.flatMap(lambda x: x.split()).collect()
['This', 'is', 'a', 'pen', 'This', 'is', 'an', 'apple']


reduce(f)Continue to act f on two elements to get one return value

>>> rdd = sc.parallelize([1, 2, 3])
>>> rdd.reduce(lambda x, y: x + y)

Operations on pair RDD

Create a pair RDD

Pair RDD is an RDD that has Tuple as an element in Python. Can handle key and value. To make it, use `keyBy``` or use `map``` to return a tuple with 2 elements to the element.


keyby(f)Let f act on the element of ordinary rdd, and return rdd with its return value as key and the original element as value.

>>> rdd = sc.parallelize(["Ken 27 180 83", "Bob 32 170 65", "Meg 29 165 45"])
>>> rdd.keyBy(lambda x: x.split()[0]).collect()
[('Ken', 'Ken 27 180 83'), ('Bob', 'Bob 32 170 65'), ('Meg', 'Meg 29 165 45')]


keysReturns an rdd consisting only of the keys of the pair rdd

>>> rdd = sc.parallelize([("Ken", 2), ("Bob", 3), ("Taka", 1), ("Ken", 3), ("Bob", 2)])
>>> rdd.keys().collect()
['Ken', 'Bob', 'Taka', 'Ken', 'Bob']


valuesReturns an rdd consisting only of the vlaue of the pair rdd

>>> rdd = sc.parallelize([("Ken", 2), ("Bob", 3), ("Taka", 1), ("Ken", 3), ("Bob", 2)])
>>> rdd.values().collect()
[2, 3, 1, 3, 2]


flatmapvalues(f)Apply flatmap to the value of pairrdd to duplicate the key and make it so-called vertical holding

>>> rdd = sc.parallelize([("Ken", "Yumi,Yukiko"), ("Bob", "Meg, Tomomi, Akira"), ("Taka", "Yuki")])
>>> rdd.flatMapValues(lambda x: x.split(","))
[('Ken', 'Yumi'),
 ('Ken', 'Yukiko'),
 ('Bob', 'Meg'),
 ('Bob', ' Tomomi'),
 ('Bob', ' Akira'),
 ('Taka', 'Yuki')]


reducebykey(f)Group by elements with the same key and apply reduce to value

>>> rdd = sc.parallelize([("Ken", 2), ("Bob", 3), ("Taka", 1), ("Ken", 3), ("Bob", 2)])
>>> rdd.reduceByKey(lambda x, y: x + y).collect()
[('Taka', 1), ('Bob', 5), ('Ken', 5)]


countbykey()Count how many values of the same key are and return with dict

>>> rdd = sc.parallelize([("Ken", 2), ("Bob", 3), ("Taka", 1), ("Ken", 3), ("Bob", 2)])
>>> rdd.countByKey()
defaultdict(<type 'int'>, {'Ken': 2, 'Bob': 2, 'Taka': 1})


sortbykeySort pair rdd by key

>>> rdd = sc.parallelize([("cba", 2), ("abc", 3), ("bac", 1), ("bbb", 
>>> rdd.sortByKey().collect()
[('aaa', 2), ('abc', 3), ('bac', 1), ('bbb', 3), ('cba', 2)]

Join operation

leftOuterJoin Left outer join two RDDs and return a pair RDD with a tuple of two elements in value

>>> rdd1 = sc.parallelize([("Ken", 1), ("Bob", 2), ("Meg", 3)])
>>> rdd2 = sc.parallelize([("Ken", 1), ("Kaz", 3)])
>>> rdd1.leftOuterJoin(rdd2).collect()
[('Bob', (2, None)), ('Meg', (3, None)), ('Ken', (1, 1))]

rightOuterJoin Right outer join two RDDs and return a pair RDD with a tuple of two elements in value

>>> rdd1 = sc.parallelize([("Ken", 1), ("Bob", 2), ("Meg", 3)])
>>> rdd2 = sc.parallelize([("Ken", 1), ("Kaz", 3)])
>>> rdd1.rightOuterJoin(rdd2).collect()
[('Ken', (1, 1)), ('Kaz', (3, None))]

fullOuterJoin Full outer join two RDDs and return a pair RDD with a tuple of two elements in value

>>> rdd1 = sc.parallelize([("Ken", 1), ("Bob", 2), ("Meg", 3)])
>>> rdd2 = sc.parallelize([("Ken", 1), ("Kaz", 3)])
>>> rdd1.fullOuterJoin(rdd2).collect()
[('Bob', (2, None)), ('Meg', (3, None)), ('Ken', (1, 1)), ('Kaz', (None, 3))]

Sort operation


sortby(f)Sort by the value returned by f

>>> rdd = sc.parallelize([("cba", 2), ("abc", 3), ("bac", 1), ("bbb", 
>>> rdd.sortBy(lambda (x, y): x).collect() #Same as sortByKey

Set operation etc.


intersection(rdd)Returns an intersection of two rdd


union(rdd)Returns the union of two rdd

zip zip(rdd)Returns a pair rdd with each element of the argument rdd as vlaue

>>> rdd = sc.parallelize([("Ken", 2), ("Bob", 3), ("Taka", 1), ("Ken", 3), ("Bob", 2)])
>>> rdd.keys().zip(rdd.values())
[('Ken', 2), ('Bob', 3), ('Taka', 1), ('Ken', 3), ('Bob', 2)]

distinct Returns an RDD that does not contain the same elements

Sampling operation


sample(bool, frac)Returns the sampled rdd. The first argument determines whether the same element can be duplicated.

>>> rdd = sc.parallelize([1, 2, 3, 4, 5])
>>> rdd.sample(True, 0.5).collect()
[1, 5, 5]
>>> rdd.sample(False, 0.5).collect()
[1, 3, 5]


takesmaple(bool, size)Returns a list of fixed size samples. The first argument determines whether the same element can be duplicated.

>>> rdd = sc.parallelize([1, 2, 3, 4, 5])
>>> rdd.takeSample(True, 2)
[5, 5]
>>> rdd.takeSample(False, 2)
[3, 5]



todebugstring()Returns the execution plan

print(rdd.flatMap(lambda x: x.split()).map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y).toDebugString())
(1) PythonRDD[190] at RDD at PythonRDD.scala:43 []
 |  MapPartitionsRDD[189] at mapPartitions at PythonRDD.scala:374 []
 |  ShuffledRDD[188] at partitionBy at null:-1 []
 +-(1) PairwiseRDD[187] at reduceByKey at <ipython-input-114-71f5cb742e13>:1 []
    |  PythonRDD[186] at reduceByKey at <ipython-input-114-71f5cb742e13>:1 []
    |  ParallelCollectionRDD[141] at parallelize at PythonRDD.scala:423 []



persist()Cache rdd as is (in memory by default). You can set only memory, disk if memory is not possible, disk only, etc. (storagelevelSpecified by)

>>> rdd.persist()


unpersist()Solve the persistence of rdd. Used when changing the persistence level.

>>> from pyspark import StorageLevel
>>> rdd.persist()
>>> rdd.unpersist()
>>> rdd.persist(StorageLevel.DISK_ONLY)

Common examples

Will be added at any time

word count

>>> rdd.flatMap(lambda x: x.split())\
       .map(lambda x: (x, 1))\
       .reduceByKey(lambda x, y: x + y)\

DataFrame This is especially convenient when dealing with structured data.

Create a DataFrame (read data)


read.json(file)Read data from json

# $ cat a.json
# {"name":"Ken", "age":35}
# {"name":"Bob", "age":30, "weight":80}
# {"name":"Meg", "age":29, "weight":45}
df = sqlContext.read.json("a.json")

Display DataFrame

There is `show``` in addition to collect```, `` take``` which is the same as RDD


show(n)Display n lines (n is 20 by default)

>>> df.show()
| 35| Ken|  null|
| 30| Bob|    80|
| 29| Meg|    45|

DataFrame operations


select(column)Returns the selected dataframe, passing a string or column object. You can also enumerate columns to get multiple columns or perform calculations.

>>> df.select("age").show()
| 35|
| 30|
| 29|

#Same for next
>>> df.select(df.age).show() #Pass a Column object
>>> df.select(df["age"]).show() #Pass a Column object
>>> df.select(df.name, df.age).show()
| Ken| 35|
| Bob| 30|
| Meg| 29|
Column object of Dataframe

There are two patterns in Python for accessing the Column object passed by select:

>>> df.age
>>> df["age"]


filter(condition)Returns a dataframe consisting of only lines that meet the string criteria.whereIsfilterAlias.

>>> df.where(df.age >=30).show()
| 35| Ken|  null|
| 30| Bob|    80|


sort(column)Returns a dataframe sorted by the specified column

>>> df.sort(df.age)
| 29| Meg|    45|
| 30| Bob|    80|
| 35| Ken|  null|


limit(n)Returns a dataframe limited to only the first n rows

>>> df.limit(1).show()
| 35| Ken|  null|


distinct()Returns a dataframe consisting only of the distinct result rows

>>> df.distinct().count()


join(dataframe, on, how)how default is inner

--on: Column or list of columns --how: `" inner "```, "outer" , "left_outer" , "right_outer" , "leftsemi" ` One of

Convert from Dataframe to RDD

Since the DataFrame is built on the RDD, the original RDD can be retrieved.

>>> print(df.rdd.collect())
[Row(age=35, name=u'Ken', weight=None),
 Row(age=30, name=u'Bob', weight=80),
 Row(age=29, name=u'Meg', weight=45)]

To retrieve only a specific column, access the corresponding attribute of the Row object

df.rdd.map(lambda row: (row.age, row.weight)).collect()
[(35, None), (30, 80), (29, 45)]

Save the Dataframe


tojson()Convert to rdd in the form of json. after thissaveastextfileYou can save it in json format by calling.

>>> df.toJSON().saveAsTextFile("b.json")
>>> df2 = sqlContext.read.json("/b.json")
| 35| Ken|  null|
| 30| Bob|    80|
| 29| Meg|    45|

from now on

Spark Streaming and Mllib related items may be added here.

