[PYTHON] Aide-mémoire de l'API Spark

Objectif

Je veux prendre note de l'API fréquemment utilisée de Spark (principalement pour moi-même) afin qu'elle puisse être utilisée rapidement même lors du développement pour la première fois depuis longtemps. Je vais résumer la version Python pour le moment (je peux ajouter la version Scala si j'ai le temps)

** Cette feuille de triche est juste une feuille de triche ** (les arguments peuvent être omis), donc si vous avez le temps, veuillez vous assurer [Official API Document (Spark Python API Docs)](http: //spark.apache. Voir org / docs / latest / api / python / index.html).

Aide-mémoire de l'API Spark (Python)

Ce qui suit suppose ce qui suit

from pyspark import SparkContext
from pyspark.sql import SQLContext

sc = SparkContext()
sqlContext = SQLContext(sc)

RDD

Créer RDD (lire les données)

parallelize

sc.parallelize(collection)Créer un RDD à partir d'une liste ou d'une touche



```py
>>> a = [1, 2, 3, 4, 5]
>>> rdd = sc.parallelize(a)

textFile

sc.textFile(file)Lisez le fichier. Vous pouvez également utiliser des caractères génériques et des expressions régulières.



```py
>>> rdd = sc.textFile("./words.txt")

wholeTextFiles

sc.wholeTextFiles(dierctory)Entrez le contenu entier de chaque fichier dans le répertoire dans un élément du RDD



```py
# $ ls
# a.json b.json c.json
>>> rdd = sc.textWholeFiles("./")

Action

La transformation est exécutée dans l'ordre pour la première fois lorsque l'action est exécutée (exécution retardée)

Ce qui renvoie un élément

collect

collect()Renvoie tous les éléments

>>> print(rdd.collect())
[1, 2, 3, 4, 5]

take

take(n)Retourne d'abord n éléments

>>> print(rdd.take(3))
[1, 2, 3]

first

first()Renvoie le tout premier élément

>>> rdd = sc.parallelize([1, 2, 3])
>>> rdd.first()
1

top

top(n)Renvoie n éléments du plus grand

>>> rdd = sc.parallelize([1, 2, 3])
>>> rdd.top(2)
[3, 2]

Qu'est-ce qui renvoie la quantité (statistique)

count

count()Compter et renvoyer le nombre d'éléments

>>> rdd = sc.parallelize([1, 2, 3])
>>> rdd.count()
3

mean

mean()Renvoie la moyenne

>>> rdd = sc.parallelize([1, 2, 3])
>>> rdd.mean()
3.0

sum

sum()Renvoie le total

>>> rdd = sc.parallelize([1, 2, 3])
>>> rdd.sum()
6

variance

variance()Renvoie la distribution

>>> rdd = sc.parallelize([1, 2, 3])
>>> rdd.variance()
0.6666666666666666

stdev

stdev()Renvoie l'écart type

>>> rdd = sc.parallelize([1, 2, 3])
>>> rdd.stdev()
0.816496580927726

Quoi économiser

saveAsTextFile

saveastextfile(file)Enregistrez le fichier

>>> rdd.saveAsTextFile("./a.txt")

Transformation

La transformation renvoie un nouveau RDD immuable

filter/map/reduce

filter

filter(f)Renvoie un rdd contenant uniquement les éléments pour lesquels f est vrai

>>> rdd = sc.parallelize([1, 2, 3])
>>> rdd.filter(lambda x: x % 2 == 0).collect()
[2]

map

map(f)Renvoie rdd avec f agissant sur tous les éléments

>>> rdd = sc.parallelize([1, 2, 3])
>>> rdd.map(lambda x: x * 2).collect()
[2, 4, 6]

flatMap

flatmap(f)Après avoir appliqué f à tous les éléments, retournez rdd qui a développé la liste dans l'élément

>>> rdd = sc.parallelize(["This is a pen", "This is an apple"])
>>> rdd.flatMap(lambda x: x.split()).collect()
['This', 'is', 'a', 'pen', 'This', 'is', 'an', 'apple']

Reduce

reduce(f)Continuez à agir sur deux éléments pour obtenir une valeur de retour

>>> rdd = sc.parallelize([1, 2, 3])
>>> rdd.reduce(lambda x, y: x + y)
6

Opérations sur paire RDD

Créer une paire RDD

Pair RDD est un RDD qui a Tuple comme élément en Python. Peut gérer la clé et la valeur. Pour le faire, utilisez keyBy ou utilisez `` map pour renvoyer un taple avec 2 éléments à l'élément.

keyBy(PairRDD)

keyby(f)Laisser f agir sur l'élément de rdd ordinaire et renvoyer rdd avec sa valeur de retour comme clé et l'élément d'origine comme valeur.

>>> rdd = sc.parallelize(["Ken 27 180 83", "Bob 32 170 65", "Meg 29 165 45"])
>>> rdd.keyBy(lambda x: x.split()[0]).collect()
[('Ken', 'Ken 27 180 83'), ('Bob', 'Bob 32 170 65'), ('Meg', 'Meg 29 165 45')]

keys

keysRenvoie un rdd composé uniquement des clés de la paire rdd

>>> rdd = sc.parallelize([("Ken", 2), ("Bob", 3), ("Taka", 1), ("Ken", 3), ("Bob", 2)])
>>> rdd.keys().collect()
['Ken', 'Bob', 'Taka', 'Ken', 'Bob']

values

valuesRenvoie un rdd constitué uniquement de vlaue de rdd apparié

>>> rdd = sc.parallelize([("Ken", 2), ("Bob", 3), ("Taka", 1), ("Ken", 3), ("Bob", 2)])
>>> rdd.values().collect()
[2, 3, 1, 3, 2]

flatMapValues

flatmapvalues(f)Appliquez flatmap à la valeur de pairrdd pour dupliquer la clé et en faire ce que l'on appelle un maintien vertical

>>> rdd = sc.parallelize([("Ken", "Yumi,Yukiko"), ("Bob", "Meg, Tomomi, Akira"), ("Taka", "Yuki")])
>>> rdd.flatMapValues(lambda x: x.split(","))
[('Ken', 'Yumi'),
 ('Ken', 'Yukiko'),
 ('Bob', 'Meg'),
 ('Bob', ' Tomomi'),
 ('Bob', ' Akira'),
 ('Taka', 'Yuki')]

reduceByKey

reducebykey(f)Regrouper par éléments avec la même clé et appliquer la réduction à la valeur

>>> rdd = sc.parallelize([("Ken", 2), ("Bob", 3), ("Taka", 1), ("Ken", 3), ("Bob", 2)])
>>> rdd.reduceByKey(lambda x, y: x + y).collect()
[('Taka', 1), ('Bob', 5), ('Ken', 5)]

countByKey

countbykey()Compter le nombre de valeurs de la même clé et renvoyer avec dict

>>> rdd = sc.parallelize([("Ken", 2), ("Bob", 3), ("Taka", 1), ("Ken", 3), ("Bob", 2)])
>>> rdd.countByKey()
defaultdict(<type 'int'>, {'Ken': 2, 'Bob': 2, 'Taka': 1})

sortByKey

sortbykeyTrier la paire rdd par clé

>>> rdd = sc.parallelize([("cba", 2), ("abc", 3), ("bac", 1), ("bbb", 
>>> rdd.sortByKey().collect()
[('aaa', 2), ('abc', 3), ('bac', 1), ('bbb', 3), ('cba', 2)]

Rejoindre l'opération

leftOuterJoin L'extérieur gauche joint deux RDD et renvoie une paire RDD avec un tuple de deux éléments en valeur

>>> rdd1 = sc.parallelize([("Ken", 1), ("Bob", 2), ("Meg", 3)])
>>> rdd2 = sc.parallelize([("Ken", 1), ("Kaz", 3)])
>>> rdd1.leftOuterJoin(rdd2).collect()
[('Bob', (2, None)), ('Meg', (3, None)), ('Ken', (1, 1))]

rightOuterJoin Jointure externe droite de deux RDD et retourne une paire de RDD avec un tuple de deux éléments en valeur

>>> rdd1 = sc.parallelize([("Ken", 1), ("Bob", 2), ("Meg", 3)])
>>> rdd2 = sc.parallelize([("Ken", 1), ("Kaz", 3)])
>>> rdd1.rightOuterJoin(rdd2).collect()
[('Ken', (1, 1)), ('Kaz', (3, None))]

fullOuterJoin Jointure externe complète de deux RDD et retour d'une paire de RDD avec un tuple de deux éléments en valeur

>>> rdd1 = sc.parallelize([("Ken", 1), ("Bob", 2), ("Meg", 3)])
>>> rdd2 = sc.parallelize([("Ken", 1), ("Kaz", 3)])
>>> rdd1.fullOuterJoin(rdd2).collect()
[('Bob', (2, None)), ('Meg', (3, None)), ('Ken', (1, 1)), ('Kaz', (None, 3))]

Opération de tri

sortBy

sortby(f)Trier par la valeur renvoyée par f

>>> rdd = sc.parallelize([("cba", 2), ("abc", 3), ("bac", 1), ("bbb", 
>>> rdd.sortBy(lambda (x, y): x).collect() #Identique à sortByKey

Opération collective etc.

intersection

intersection(rdd)Renvoie une intersection de deux rdd

union

union(rdd)Renvoie l'union de deux rdd

zip zip(rdd)Renvoie une paire rdd avec chaque élément de l'argument rdd comme vlaue

>>> rdd = sc.parallelize([("Ken", 2), ("Bob", 3), ("Taka", 1), ("Ken", 3), ("Bob", 2)])
>>> rdd.keys().zip(rdd.values())
[('Ken', 2), ('Bob', 3), ('Taka', 1), ('Ken', 3), ('Bob', 2)]

distinct Renvoie un RDD qui ne contient pas les mêmes éléments

Opération d'échantillonnage

sample

sample(bool, frac)Renvoie le rdd échantillonné. Le premier argument détermine si le même élément peut être dupliqué.

>>> rdd = sc.parallelize([1, 2, 3, 4, 5])
>>> rdd.sample(True, 0.5).collect()
[1, 5, 5]
>>> rdd.sample(False, 0.5).collect()
[1, 3, 5]

takeSample

takesmaple(bool, size)Renvoie une liste d'échantillons de taille fixe. Le premier argument détermine si le même élément peut être dupliqué.

>>> rdd = sc.parallelize([1, 2, 3, 4, 5])
>>> rdd.takeSample(True, 2)
[5, 5]
>>> rdd.takeSample(False, 2)
[3, 5]

déboguer

toDebugString

todebugstring()Renvoie le plan d'exécution

print(rdd.flatMap(lambda x: x.split()).map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y).toDebugString())
(1) PythonRDD[190] at RDD at PythonRDD.scala:43 []
 |  MapPartitionsRDD[189] at mapPartitions at PythonRDD.scala:374 []
 |  ShuffledRDD[188] at partitionBy at null:-1 []
 +-(1) PairwiseRDD[187] at reduceByKey at <ipython-input-114-71f5cb742e13>:1 []
    |  PythonRDD[186] at reduceByKey at <ipython-input-114-71f5cb742e13>:1 []
    |  ParallelCollectionRDD[141] at parallelize at PythonRDD.scala:423 []

Persistance

persist

persist()Cache rdd tel quel (en mémoire par défaut). Vous pouvez définir uniquement la mémoire, le disque si la mémoire n'est pas possible, le disque uniquement, etc. (storagelevelSpécifié par)

>>> rdd.persist()

unpersist

unpersist()Résolvez la persistance de rdd. Utilisé lors du changement du niveau de persistance.

>>> from pyspark import StorageLevel
>>> rdd.persist()
>>> rdd.unpersist()
>>> rdd.persist(StorageLevel.DISK_ONLY)

Exemples courants

Sera ajouté à tout moment

word count

>>> rdd.flatMap(lambda x: x.split())\
       .map(lambda x: (x, 1))\
       .reduceByKey(lambda x, y: x + y)\
       .take(5)

DataFrame Ceci est particulièrement pratique lorsqu'il s'agit de données structurées.

Créer un DataFrame (lire les données)

read.json

read.json(file)Lire les données de json



```py
# $ cat a.json
# {"name":"Ken", "age":35}
# {"name":"Bob", "age":30, "weight":80}
# {"name":"Meg", "age":29, "weight":45}
df = sqlContext.read.json("a.json")

Afficher DataFrame

Il y a show '' en plus de collect '', `` take '' qui est identique à RDD

show

show(n)Afficher n lignes (n vaut 20 par défaut)

>>> df.show()
+---+----+------+
|age|name|weight|
+---+----+------+
| 35| Ken|  null|
| 30| Bob|    80|
| 29| Meg|    45|
+---+----+------+

Opérations DataFrame

select

select(column)Renvoie le dataframe sélectionné, en passant un objet chaîne ou colonne. Vous pouvez également énumérer des colonnes pour obtenir plusieurs colonnes ou effectuer des calculs.

>>> df.select("age").show()
+---+
|age|
+---+
| 35|
| 30|
| 29|
+---+

#Idem pour le suivant
>>> df.select(df.age).show() #Passer un objet Column
>>> df.select(df["age"]).show() #Passer un objet Column
>>> df.select(df.name, df.age).show()
+----+---+
|name|age|
+----+---+
| Ken| 35|
| Bob| 30|
| Meg| 29|
+----+---+
Objet Colonne Dataframe

Il existe deux modèles en Python pour accéder à l'objet Column passé par select:

>>> df.age
Column<age>
>>> df["age"]
Column<age>

where/filter

filter(condition)Renvoie une trame de données composée uniquement de lignes répondant aux critères de chaîne.whereEstfilterC'est un alias de.

>>> df.where(df.age >=30).show()
+---+----+------+
|age|name|weight|
+---+----+------+
| 35| Ken|  null|
| 30| Bob|    80|
+---+----+------+

sort

sort(column)Renvoie un dataframe trié par la colonne spécifiée

>>> df.sort(df.age)
+---+----+------+
|age|name|weight|
+---+----+------+
| 29| Meg|    45|
| 30| Bob|    80|
| 35| Ken|  null|
+---+----+------+

limit

limit(n)Renvoie une trame de données limitée aux n premières lignes uniquement

>>> df.limit(1).show()
+---+----+------+
|age|name|weight|
+---+----+------+
| 35| Ken|  null|
+---+----+------+

distinct

distinct()Renvoie une trame de données constituée uniquement des lignes de résultat distinctes

>>> df.distinct().count()
3

join

join(dataframe, on, how)comment la valeur par défaut est intérieure

--on: Colonne ou liste de colonnes --how: `" intérieur "`, "extérieur" , "gauche_outer" , `right_outer" , `leftsemi" ` L'un des

Convertir de Dataframe en RDD

Puisque le DataFrame est construit sur le RDD, le RDD original peut être récupéré.

>>> print(df.rdd.collect())
[Row(age=35, name=u'Ken', weight=None),
 Row(age=30, name=u'Bob', weight=80),
 Row(age=29, name=u'Meg', weight=45)]

Accédez aux attributs correspondants de l'objet `` Row '' pour récupérer uniquement des colonnes spécifiques

df.rdd.map(lambda row: (row.age, row.weight)).collect()
[(35, None), (30, 80), (29, 45)]

Enregistrez le Dataframe

toJson

tojson()Convertissez en rdd sous la forme de json. après çasaveastextfileVous pouvez l'enregistrer au format json en appelant.

>>> df.toJSON().saveAsTextFile("b.json")
>>> df2 = sqlContext.read.json("/b.json")
+---+----+------+
|age|name|weight|
+---+----+------+
| 35| Ken|  null|
| 30| Bob|    80|
| 29| Meg|    45|
+---+----+------+

à partir de maintenant

Des éléments liés à Spark Streaming et Mllib peuvent être ajoutés ici.

Recommended Posts

Aide-mémoire de l'API Spark
Aide-mémoire sur les pièces jointes de l'API Slack
Aide-mémoire au curry
Aide-mémoire SQLite3
feuille de triche pyenv
feuille de triche de commande conda
Aide-mémoire PIL / Pillow
feuille de triche de commande ps
Aide-mémoire Python3 (basique)
Fiche technique PySpark [Python]
Feuille de triche de tri Python
feuille de triche de fichier de réglage tox
feuille de triche de réutilisation de la mémoire numpy
[Python3] Entrée standard [Cheet sheet]
Fiche technique de la science des données (Python)
Fiche technique du didacticiel Python Django
feuille de triche de l'algorithme scikit learn
Apache Beam Cheet Sheet [Python]
Aide-mémoire personnel Google Test / Mock
Aide-mémoire sur le style de livraison continue (CPS)
Aide-mémoire Python (pour les expérimentés C ++)
Aide-mémoire sur le curry [liste de la version de l'exemple de description]
Fiche de triche AtCoder en python (pour moi-même)
Fiche technique de l'accès aux données Blender Python Mesh
Feuille de calcul du modélisateur d'optimisation mathématique (PuLP) (Python)