[PYTHON] Spark API Spickzettel

Zweck

Ich möchte die häufig verwendete API von Spark (hauptsächlich für mich selbst) notieren, damit sie auch bei der ersten Entwicklung seit langer Zeit schnell verwendet werden kann. Ich werde die Python-Version vorerst zusammenfassen (ich kann die Scala-Version hinzufügen, wenn ich Zeit habe)

** Dieser Spickzettel ist nur ein Spickzettel ** (Argumente können weggelassen werden). Wenn Sie also Zeit haben, stellen Sie bitte sicher, dass [Offizielles API-Dokument (Spark Python API Docs)](http: //spark.apache). Siehe org / docs / latest / api / python / index.html).

Spark API Spickzettel (Python)

Das Folgende setzt Folgendes voraus

from pyspark import SparkContext
from pyspark.sql import SQLContext

sc = SparkContext()
sqlContext = SQLContext(sc)

RDD

RDD erstellen (Daten lesen)

parallelize

sc.parallelize(collection)Erstellen Sie eine RDD aus einer Liste oder einem Taple



```py
>>> a = [1, 2, 3, 4, 5]
>>> rdd = sc.parallelize(a)

textFile

sc.textFile(file)Lesen Sie die Datei. Sie können auch Platzhalter und reguläre Ausdrücke verwenden.



```py
>>> rdd = sc.textFile("./words.txt")

wholeTextFiles

sc.wholeTextFiles(dierctory)Geben Sie den gesamten Inhalt jeder Datei im Verzeichnis in ein Element der RDD ein



```py
# $ ls
# a.json b.json c.json
>>> rdd = sc.textWholeFiles("./")

Action

Die Transformation wird zum ersten Mal ausgeführt, wenn die Aktion ausgeführt wird (verzögerte Ausführung).

Was gibt ein Element zurück

collect

collect()Gibt alle Elemente zurück

>>> print(rdd.collect())
[1, 2, 3, 4, 5]

take

take(n)Gibt zuerst n Elemente zurück

>>> print(rdd.take(3))
[1, 2, 3]

first

first()Gibt das allererste Element zurück

>>> rdd = sc.parallelize([1, 2, 3])
>>> rdd.first()
1

top

top(n)Gibt n Elemente vom größten zurück

>>> rdd = sc.parallelize([1, 2, 3])
>>> rdd.top(2)
[3, 2]

Was gibt die (Statistik-) Menge zurück?

count

count()Zählen Sie die Anzahl der Elemente und geben Sie sie zurück

>>> rdd = sc.parallelize([1, 2, 3])
>>> rdd.count()
3

mean

mean()Gibt den Durchschnitt zurück

>>> rdd = sc.parallelize([1, 2, 3])
>>> rdd.mean()
3.0

sum

sum()Gibt die Summe zurück

>>> rdd = sc.parallelize([1, 2, 3])
>>> rdd.sum()
6

variance

variance()Gibt die Verteilung zurück

>>> rdd = sc.parallelize([1, 2, 3])
>>> rdd.variance()
0.6666666666666666

stdev

stdev()Gibt die Standardabweichung zurück

>>> rdd = sc.parallelize([1, 2, 3])
>>> rdd.stdev()
0.816496580927726

Was zu speichern

saveAsTextFile

saveastextfile(file)Speicher die Datei

>>> rdd.saveAsTextFile("./a.txt")

Transformation

Die Transformation gibt eine neue unveränderliche RDD zurück

filter/map/reduce

filter

filter(f)Gibt eine rdd zurück, die nur Elemente enthält, für die f wahr ist

>>> rdd = sc.parallelize([1, 2, 3])
>>> rdd.filter(lambda x: x % 2 == 0).collect()
[2]

map

map(f)Gibt rdd zurück, wobei f auf alle Elemente wirkt

>>> rdd = sc.parallelize([1, 2, 3])
>>> rdd.map(lambda x: x * 2).collect()
[2, 4, 6]

flatMap

flatmap(f)Nachdem Sie f auf alle Elemente angewendet haben, geben Sie rdd zurück, wodurch die Liste im Element erweitert wurde

>>> rdd = sc.parallelize(["This is a pen", "This is an apple"])
>>> rdd.flatMap(lambda x: x.split()).collect()
['This', 'is', 'a', 'pen', 'This', 'is', 'an', 'apple']

Reduce

reduce(f)Wirken Sie weiterhin auf zwei Elemente, um einen Rückgabewert zu erhalten

>>> rdd = sc.parallelize([1, 2, 3])
>>> rdd.reduce(lambda x, y: x + y)
6

Operationen am Paar RDD

Erstellen Sie ein RDD-Paar

Pair RDD ist ein RDD mit Tuple als Element in Python. Kann mit Schlüssel und Wert umgehen. Verwenden Sie dazu `keyBy``` oder` map```, um einen Taple mit 2 Elementen an das Element zurückzugeben.

keyBy(PairRDD)

keyby(f)Lassen Sie f auf ein gewöhnliches Element von rdd einwirken und geben Sie rdd mit seinem Rückgabewert als Schlüssel und dem ursprünglichen Element als Wert zurück.

>>> rdd = sc.parallelize(["Ken 27 180 83", "Bob 32 170 65", "Meg 29 165 45"])
>>> rdd.keyBy(lambda x: x.split()[0]).collect()
[('Ken', 'Ken 27 180 83'), ('Bob', 'Bob 32 170 65'), ('Meg', 'Meg 29 165 45')]

keys

keysGibt eine rdd zurück, die nur aus den Schlüsseln des Paares rdd besteht

>>> rdd = sc.parallelize([("Ken", 2), ("Bob", 3), ("Taka", 1), ("Ken", 3), ("Bob", 2)])
>>> rdd.keys().collect()
['Ken', 'Bob', 'Taka', 'Ken', 'Bob']

values

valuesGibt eine rdd zurück, die nur aus dem Wert der gepaarten rdd besteht

>>> rdd = sc.parallelize([("Ken", 2), ("Bob", 3), ("Taka", 1), ("Ken", 3), ("Bob", 2)])
>>> rdd.values().collect()
[2, 3, 1, 3, 2]

flatMapValues

flatmapvalues(f)Wenden Sie flatmap auf den Wert von pairrdd an, um den Schlüssel zu duplizieren und ihn als vertikales Halten zu bezeichnen

>>> rdd = sc.parallelize([("Ken", "Yumi,Yukiko"), ("Bob", "Meg, Tomomi, Akira"), ("Taka", "Yuki")])
>>> rdd.flatMapValues(lambda x: x.split(","))
[('Ken', 'Yumi'),
 ('Ken', 'Yukiko'),
 ('Bob', 'Meg'),
 ('Bob', ' Tomomi'),
 ('Bob', ' Akira'),
 ('Taka', 'Yuki')]

reduceByKey

reducebykey(f)Nach Elementen mit demselben Schlüssel gruppieren und auf Wert reduzieren anwenden

>>> rdd = sc.parallelize([("Ken", 2), ("Bob", 3), ("Taka", 1), ("Ken", 3), ("Bob", 2)])
>>> rdd.reduceByKey(lambda x, y: x + y).collect()
[('Taka', 1), ('Bob', 5), ('Ken', 5)]

countByKey

countbykey()Zählen Sie, wie viele Werte desselben Schlüssels vorhanden sind, und kehren Sie mit dict zurück

>>> rdd = sc.parallelize([("Ken", 2), ("Bob", 3), ("Taka", 1), ("Ken", 3), ("Bob", 2)])
>>> rdd.countByKey()
defaultdict(<type 'int'>, {'Ken': 2, 'Bob': 2, 'Taka': 1})

sortByKey

sortbykeySortieren Sie das Paar rdd nach Schlüssel

>>> rdd = sc.parallelize([("cba", 2), ("abc", 3), ("bac", 1), ("bbb", 
>>> rdd.sortByKey().collect()
[('aaa', 2), ('abc', 3), ('bac', 1), ('bbb', 3), ('cba', 2)]

Join-Operation

leftOuterJoin Links außen zwei RDDs verbinden und ein Paar RDD mit einem Tupel von zwei Wertelementen zurückgeben

>>> rdd1 = sc.parallelize([("Ken", 1), ("Bob", 2), ("Meg", 3)])
>>> rdd2 = sc.parallelize([("Ken", 1), ("Kaz", 3)])
>>> rdd1.leftOuterJoin(rdd2).collect()
[('Bob', (2, None)), ('Meg', (3, None)), ('Ken', (1, 1))]

rightOuterJoin Rechts außen zwei RDDs verbinden und ein Paar RDD mit einem Tupel von zwei Wertelementen zurückgeben

>>> rdd1 = sc.parallelize([("Ken", 1), ("Bob", 2), ("Meg", 3)])
>>> rdd2 = sc.parallelize([("Ken", 1), ("Kaz", 3)])
>>> rdd1.rightOuterJoin(rdd2).collect()
[('Ken', (1, 1)), ('Kaz', (3, None))]

fullOuterJoin Vollständige äußere Verknüpfung zweier RDDs und Rückgabe eines RDD-Paares mit einem Tupel aus zwei Wertelementen

>>> rdd1 = sc.parallelize([("Ken", 1), ("Bob", 2), ("Meg", 3)])
>>> rdd2 = sc.parallelize([("Ken", 1), ("Kaz", 3)])
>>> rdd1.fullOuterJoin(rdd2).collect()
[('Bob', (2, None)), ('Meg', (3, None)), ('Ken', (1, 1)), ('Kaz', (None, 3))]

Sortiervorgang

sortBy

sortby(f)Sortieren nach dem von f zurückgegebenen Wert

>>> rdd = sc.parallelize([("cba", 2), ("abc", 3), ("bac", 1), ("bbb", 
>>> rdd.sortBy(lambda (x, y): x).collect() #Gleich wie sortByKey

Kollektivbetrieb etc.

intersection

intersection(rdd)Gibt einen Schnittpunkt von zwei rdd zurück

union

union(rdd)Gibt die Vereinigung von zwei rdd zurück

zip zip(rdd)Gibt ein Paar rdd mit jedem Element des Arguments rdd als vlaue zurück

>>> rdd = sc.parallelize([("Ken", 2), ("Bob", 3), ("Taka", 1), ("Ken", 3), ("Bob", 2)])
>>> rdd.keys().zip(rdd.values())
[('Ken', 2), ('Bob', 3), ('Taka', 1), ('Ken', 3), ('Bob', 2)]

distinct Gibt eine RDD zurück, die nicht dieselben Elemente enthält

Probenahmevorgang

sample

sample(bool, frac)Gibt die abgetastete Festplatte zurück. Das erste Argument bestimmt, ob dasselbe Element dupliziert werden kann.

>>> rdd = sc.parallelize([1, 2, 3, 4, 5])
>>> rdd.sample(True, 0.5).collect()
[1, 5, 5]
>>> rdd.sample(False, 0.5).collect()
[1, 3, 5]

takeSample

takesmaple(bool, size)Gibt eine Liste von Mustern mit fester Größe zurück. Das erste Argument bestimmt, ob dasselbe Element dupliziert werden kann.

>>> rdd = sc.parallelize([1, 2, 3, 4, 5])
>>> rdd.takeSample(True, 2)
[5, 5]
>>> rdd.takeSample(False, 2)
[3, 5]

debuggen

toDebugString

todebugstring()Gibt den Ausführungsplan zurück

print(rdd.flatMap(lambda x: x.split()).map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y).toDebugString())
(1) PythonRDD[190] at RDD at PythonRDD.scala:43 []
 |  MapPartitionsRDD[189] at mapPartitions at PythonRDD.scala:374 []
 |  ShuffledRDD[188] at partitionBy at null:-1 []
 +-(1) PairwiseRDD[187] at reduceByKey at <ipython-input-114-71f5cb742e13>:1 []
    |  PythonRDD[186] at reduceByKey at <ipython-input-114-71f5cb742e13>:1 []
    |  ParallelCollectionRDD[141] at parallelize at PythonRDD.scala:423 []

Beharrlichkeit

persist

persist()Cache rdd wie es ist (standardmäßig im Speicher). Sie können nur Speicher, Festplatte, wenn Speicher nicht möglich ist, nur Festplatte usw. einstellen. (storagelevelSpezifiziert durch)

>>> rdd.persist()

unpersist

unpersist()Lösen Sie die Persistenz von rdd. Wird beim Ändern der Persistenzstufe verwendet.

>>> from pyspark import StorageLevel
>>> rdd.persist()
>>> rdd.unpersist()
>>> rdd.persist(StorageLevel.DISK_ONLY)

Häufige Beispiele

Wird jederzeit hinzugefügt

word count

>>> rdd.flatMap(lambda x: x.split())\
       .map(lambda x: (x, 1))\
       .reduceByKey(lambda x, y: x + y)\
       .take(5)

DataFrame Dies ist besonders praktisch, wenn Sie mit strukturierten Daten arbeiten.

Erstellen Sie einen DataFrame (Daten lesen)

read.json

read.json(file)Lesen Sie Daten von json



```py
# $ cat a.json
# {"name":"Ken", "age":35}
# {"name":"Bob", "age":30, "weight":80}
# {"name":"Meg", "age":29, "weight":45}
df = sqlContext.read.json("a.json")

DataFrame anzeigen

Es gibt `show``` zusätzlich zu` collect```, `take

show

show(n)N Zeilen anzeigen (n ist standardmäßig 20)

>>> df.show()
+---+----+------+
|age|name|weight|
+---+----+------+
| 35| Ken|  null|
| 30| Bob|    80|
| 29| Meg|    45|
+---+----+------+

DataFrame-Operationen

select

select(column)Gibt den ausgewählten Datenrahmen zurück und übergibt eine Zeichenfolge oder ein Spaltenobjekt. Sie können auch Spalten auflisten, um mehrere Spalten abzurufen oder Berechnungen durchzuführen.

>>> df.select("age").show()
+---+
|age|
+---+
| 35|
| 30|
| 29|
+---+

#Gleiches gilt für den nächsten
>>> df.select(df.age).show() #Übergeben Sie ein Spaltenobjekt
>>> df.select(df["age"]).show() #Übergeben Sie ein Spaltenobjekt
>>> df.select(df.name, df.age).show()
+----+---+
|name|age|
+----+---+
| Ken| 35|
| Bob| 30|
| Meg| 29|
+----+---+
Dataframe Column-Objekt

In Python gibt es zwei Muster für den Zugriff auf das von select übergebene Column-Objekt:

>>> df.age
Column<age>
>>> df["age"]
Column<age>

where/filter

filter(condition)Gibt einen Datenrahmen zurück, der nur aus Zeilen besteht, die die Zeichenfolgenkriterien erfüllen.whereIstfilterEs ist ein Pseudonym von.

>>> df.where(df.age >=30).show()
+---+----+------+
|age|name|weight|
+---+----+------+
| 35| Ken|  null|
| 30| Bob|    80|
+---+----+------+

sort

sort(column)Gibt einen Datenrahmen zurück, der nach der angegebenen Spalte sortiert ist

>>> df.sort(df.age)
+---+----+------+
|age|name|weight|
+---+----+------+
| 29| Meg|    45|
| 30| Bob|    80|
| 35| Ken|  null|
+---+----+------+

limit

limit(n)Gibt einen Datenrahmen zurück, der nur auf die ersten n Zeilen beschränkt ist

>>> df.limit(1).show()
+---+----+------+
|age|name|weight|
+---+----+------+
| 35| Ken|  null|
+---+----+------+

distinct

distinct()Gibt einen Datenrahmen zurück, der nur aus den unterschiedlichen Ergebniszeilen besteht

>>> df.distinct().count()
3

join

join(dataframe, on, how)wie standard ist inner

--on: Spalte oder Liste der Spalten --how: `" inner "`, "äußere" , " left_outer " , "right_outer" `,` `" leftsemi "` Einer von

Von Dataframe nach RDD konvertieren

Da der DataFrame auf dem RDD basiert, kann das ursprüngliche RDD abgerufen werden.

>>> print(df.rdd.collect())
[Row(age=35, name=u'Ken', weight=None),
 Row(age=30, name=u'Bob', weight=80),
 Row(age=29, name=u'Meg', weight=45)]

Um nur eine bestimmte Spalte abzurufen, greifen Sie auf das entsprechende Attribut des `` `Row```-Objekts zu

df.rdd.map(lambda row: (row.age, row.weight)).collect()
[(35, None), (30, 80), (29, 45)]

Speichern Sie den Datenrahmen

toJson

tojson()Konvertieren Sie in rdd in Form von json. danachsaveastextfileSie können es im JSON-Format speichern, indem Sie aufrufen.

>>> df.toJSON().saveAsTextFile("b.json")
>>> df2 = sqlContext.read.json("/b.json")
+---+----+------+
|age|name|weight|
+---+----+------+
| 35| Ken|  null|
| 30| Bob|    80|
| 29| Meg|    45|
+---+----+------+

von jetzt an

Spark Streaming und Mllib verwandte Elemente können hier hinzugefügt werden.

Recommended Posts

Spark API Spickzettel
Slack API Anhänge Spickzettel
Curry Spickzettel
SQLite3 Spickzettel
pyenv Spickzettel
conda Befehl Spickzettel
PIL / Kissen Spickzettel
ps Befehl Spickzettel
Python3 Spickzettel (Basic)
PySpark Cheet Sheet [Python]
Python-Spickzettel
Tox Einstellungsdatei Spickzettel
numpy Speicher wiederverwenden Spickzettel
[Python3] Standardeingabe [Cheet Sheet]
Data Science Cheet Sheet (Python)
Python Django Tutorial Cheet Sheet
Scikit lernen Algorithmus Spickzettel
Apache Beam Cheet Sheet [Python]
Persönlicher Spickzettel von Google Test / Mock
CPS-Spickzettel (Continuous Delivery Style)
Python-Spickzettel (für C ++ erfahren)
Curry-Spickzettel [Listenversion des Beschreibungsbeispiels]
AtCoder Spickzettel in Python (für mich)
Blender Python Mesh Datenzugriffsprüfblatt
Cheet Sheet (Python) des Mathematical Optimization Modeler (PuLP)