Ich möchte die häufig verwendete API von Spark (hauptsächlich für mich selbst) notieren, damit sie auch bei der ersten Entwicklung seit langer Zeit schnell verwendet werden kann. Ich werde die Python-Version vorerst zusammenfassen (ich kann die Scala-Version hinzufügen, wenn ich Zeit habe)
** Dieser Spickzettel ist nur ein Spickzettel ** (Argumente können weggelassen werden). Wenn Sie also Zeit haben, stellen Sie bitte sicher, dass [Offizielles API-Dokument (Spark Python API Docs)](http: //spark.apache). Siehe org / docs / latest / api / python / index.html).
Das Folgende setzt Folgendes voraus
from pyspark import SparkContext
from pyspark.sql import SQLContext
sc = SparkContext()
sqlContext = SQLContext(sc)
RDD
parallelize
sc.parallelize(collection)Erstellen Sie eine RDD aus einer Liste oder einem Taple
```py
>>> a = [1, 2, 3, 4, 5]
>>> rdd = sc.parallelize(a)
textFile
sc.textFile(file)Lesen Sie die Datei. Sie können auch Platzhalter und reguläre Ausdrücke verwenden.
```py
>>> rdd = sc.textFile("./words.txt")
wholeTextFiles
sc.wholeTextFiles(dierctory)Geben Sie den gesamten Inhalt jeder Datei im Verzeichnis in ein Element der RDD ein
```py
# $ ls
# a.json b.json c.json
>>> rdd = sc.textWholeFiles("./")
Action
Die Transformation wird zum ersten Mal ausgeführt, wenn die Aktion ausgeführt wird (verzögerte Ausführung).
collect
collect()
Gibt alle Elemente zurück
>>> print(rdd.collect())
[1, 2, 3, 4, 5]
take
take(n)
Gibt zuerst n Elemente zurück
>>> print(rdd.take(3))
[1, 2, 3]
first
first()
Gibt das allererste Element zurück
>>> rdd = sc.parallelize([1, 2, 3])
>>> rdd.first()
1
top
top(n)
Gibt n Elemente vom größten zurück
>>> rdd = sc.parallelize([1, 2, 3])
>>> rdd.top(2)
[3, 2]
count
count()
Zählen Sie die Anzahl der Elemente und geben Sie sie zurück
>>> rdd = sc.parallelize([1, 2, 3])
>>> rdd.count()
3
mean
mean()
Gibt den Durchschnitt zurück
>>> rdd = sc.parallelize([1, 2, 3])
>>> rdd.mean()
3.0
sum
sum()
Gibt die Summe zurück
>>> rdd = sc.parallelize([1, 2, 3])
>>> rdd.sum()
6
variance
variance()
Gibt die Verteilung zurück
>>> rdd = sc.parallelize([1, 2, 3])
>>> rdd.variance()
0.6666666666666666
stdev
stdev()
Gibt die Standardabweichung zurück
>>> rdd = sc.parallelize([1, 2, 3])
>>> rdd.stdev()
0.816496580927726
saveAsTextFile
saveastextfile(file)
Speicher die Datei
>>> rdd.saveAsTextFile("./a.txt")
Transformation
Die Transformation gibt eine neue unveränderliche RDD zurück
filter/map/reduce
filter
filter(f)
Gibt eine rdd zurück, die nur Elemente enthält, für die f wahr ist
>>> rdd = sc.parallelize([1, 2, 3])
>>> rdd.filter(lambda x: x % 2 == 0).collect()
[2]
map
map(f)
Gibt rdd zurück, wobei f auf alle Elemente wirkt
>>> rdd = sc.parallelize([1, 2, 3])
>>> rdd.map(lambda x: x * 2).collect()
[2, 4, 6]
flatMap
flatmap(f)
Nachdem Sie f auf alle Elemente angewendet haben, geben Sie rdd zurück, wodurch die Liste im Element erweitert wurde
>>> rdd = sc.parallelize(["This is a pen", "This is an apple"])
>>> rdd.flatMap(lambda x: x.split()).collect()
['This', 'is', 'a', 'pen', 'This', 'is', 'an', 'apple']
Reduce
reduce(f)
Wirken Sie weiterhin auf zwei Elemente, um einen Rückgabewert zu erhalten
>>> rdd = sc.parallelize([1, 2, 3])
>>> rdd.reduce(lambda x, y: x + y)
6
Pair RDD ist ein RDD mit Tuple als Element in Python. Kann mit Schlüssel und Wert umgehen.
Verwenden Sie dazu `keyBy``` oder`
map```, um einen Taple mit 2 Elementen an das Element zurückzugeben.
keyBy(PairRDD)
keyby(f)
Lassen Sie f auf ein gewöhnliches Element von rdd einwirken und geben Sie rdd mit seinem Rückgabewert als Schlüssel und dem ursprünglichen Element als Wert zurück.
>>> rdd = sc.parallelize(["Ken 27 180 83", "Bob 32 170 65", "Meg 29 165 45"])
>>> rdd.keyBy(lambda x: x.split()[0]).collect()
[('Ken', 'Ken 27 180 83'), ('Bob', 'Bob 32 170 65'), ('Meg', 'Meg 29 165 45')]
keys
keys
Gibt eine rdd zurück, die nur aus den Schlüsseln des Paares rdd besteht
>>> rdd = sc.parallelize([("Ken", 2), ("Bob", 3), ("Taka", 1), ("Ken", 3), ("Bob", 2)])
>>> rdd.keys().collect()
['Ken', 'Bob', 'Taka', 'Ken', 'Bob']
values
values
Gibt eine rdd zurück, die nur aus dem Wert der gepaarten rdd besteht
>>> rdd = sc.parallelize([("Ken", 2), ("Bob", 3), ("Taka", 1), ("Ken", 3), ("Bob", 2)])
>>> rdd.values().collect()
[2, 3, 1, 3, 2]
flatMapValues
flatmapvalues(f)
Wenden Sie flatmap auf den Wert von pairrdd an, um den Schlüssel zu duplizieren und ihn als vertikales Halten zu bezeichnen
>>> rdd = sc.parallelize([("Ken", "Yumi,Yukiko"), ("Bob", "Meg, Tomomi, Akira"), ("Taka", "Yuki")])
>>> rdd.flatMapValues(lambda x: x.split(","))
[('Ken', 'Yumi'),
('Ken', 'Yukiko'),
('Bob', 'Meg'),
('Bob', ' Tomomi'),
('Bob', ' Akira'),
('Taka', 'Yuki')]
reduceByKey
reducebykey(f)
Nach Elementen mit demselben Schlüssel gruppieren und auf Wert reduzieren anwenden
>>> rdd = sc.parallelize([("Ken", 2), ("Bob", 3), ("Taka", 1), ("Ken", 3), ("Bob", 2)])
>>> rdd.reduceByKey(lambda x, y: x + y).collect()
[('Taka', 1), ('Bob', 5), ('Ken', 5)]
countByKey
countbykey()
Zählen Sie, wie viele Werte desselben Schlüssels vorhanden sind, und kehren Sie mit dict zurück
>>> rdd = sc.parallelize([("Ken", 2), ("Bob", 3), ("Taka", 1), ("Ken", 3), ("Bob", 2)])
>>> rdd.countByKey()
defaultdict(<type 'int'>, {'Ken': 2, 'Bob': 2, 'Taka': 1})
sortByKey
sortbykey
Sortieren Sie das Paar rdd nach Schlüssel
>>> rdd = sc.parallelize([("cba", 2), ("abc", 3), ("bac", 1), ("bbb",
>>> rdd.sortByKey().collect()
[('aaa', 2), ('abc', 3), ('bac', 1), ('bbb', 3), ('cba', 2)]
leftOuterJoin Links außen zwei RDDs verbinden und ein Paar RDD mit einem Tupel von zwei Wertelementen zurückgeben
>>> rdd1 = sc.parallelize([("Ken", 1), ("Bob", 2), ("Meg", 3)])
>>> rdd2 = sc.parallelize([("Ken", 1), ("Kaz", 3)])
>>> rdd1.leftOuterJoin(rdd2).collect()
[('Bob', (2, None)), ('Meg', (3, None)), ('Ken', (1, 1))]
rightOuterJoin Rechts außen zwei RDDs verbinden und ein Paar RDD mit einem Tupel von zwei Wertelementen zurückgeben
>>> rdd1 = sc.parallelize([("Ken", 1), ("Bob", 2), ("Meg", 3)])
>>> rdd2 = sc.parallelize([("Ken", 1), ("Kaz", 3)])
>>> rdd1.rightOuterJoin(rdd2).collect()
[('Ken', (1, 1)), ('Kaz', (3, None))]
fullOuterJoin Vollständige äußere Verknüpfung zweier RDDs und Rückgabe eines RDD-Paares mit einem Tupel aus zwei Wertelementen
>>> rdd1 = sc.parallelize([("Ken", 1), ("Bob", 2), ("Meg", 3)])
>>> rdd2 = sc.parallelize([("Ken", 1), ("Kaz", 3)])
>>> rdd1.fullOuterJoin(rdd2).collect()
[('Bob', (2, None)), ('Meg', (3, None)), ('Ken', (1, 1)), ('Kaz', (None, 3))]
sortBy
sortby(f)
Sortieren nach dem von f zurückgegebenen Wert
>>> rdd = sc.parallelize([("cba", 2), ("abc", 3), ("bac", 1), ("bbb",
>>> rdd.sortBy(lambda (x, y): x).collect() #Gleich wie sortByKey
intersection
intersection(rdd)
Gibt einen Schnittpunkt von zwei rdd zurück
union
union(rdd)
Gibt die Vereinigung von zwei rdd zurück
zip
zip(rdd)
Gibt ein Paar rdd mit jedem Element des Arguments rdd als vlaue zurück
>>> rdd = sc.parallelize([("Ken", 2), ("Bob", 3), ("Taka", 1), ("Ken", 3), ("Bob", 2)])
>>> rdd.keys().zip(rdd.values())
[('Ken', 2), ('Bob', 3), ('Taka', 1), ('Ken', 3), ('Bob', 2)]
distinct Gibt eine RDD zurück, die nicht dieselben Elemente enthält
sample
sample(bool, frac)
Gibt die abgetastete Festplatte zurück. Das erste Argument bestimmt, ob dasselbe Element dupliziert werden kann.
>>> rdd = sc.parallelize([1, 2, 3, 4, 5])
>>> rdd.sample(True, 0.5).collect()
[1, 5, 5]
>>> rdd.sample(False, 0.5).collect()
[1, 3, 5]
takeSample
takesmaple(bool, size)
Gibt eine Liste von Mustern mit fester Größe zurück. Das erste Argument bestimmt, ob dasselbe Element dupliziert werden kann.
>>> rdd = sc.parallelize([1, 2, 3, 4, 5])
>>> rdd.takeSample(True, 2)
[5, 5]
>>> rdd.takeSample(False, 2)
[3, 5]
toDebugString
todebugstring()
Gibt den Ausführungsplan zurück
print(rdd.flatMap(lambda x: x.split()).map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y).toDebugString())
(1) PythonRDD[190] at RDD at PythonRDD.scala:43 []
| MapPartitionsRDD[189] at mapPartitions at PythonRDD.scala:374 []
| ShuffledRDD[188] at partitionBy at null:-1 []
+-(1) PairwiseRDD[187] at reduceByKey at <ipython-input-114-71f5cb742e13>:1 []
| PythonRDD[186] at reduceByKey at <ipython-input-114-71f5cb742e13>:1 []
| ParallelCollectionRDD[141] at parallelize at PythonRDD.scala:423 []
persist
persist()
Cache rdd wie es ist (standardmäßig im Speicher). Sie können nur Speicher, Festplatte, wenn Speicher nicht möglich ist, nur Festplatte usw. einstellen. (storagelevel
Spezifiziert durch)
>>> rdd.persist()
unpersist
unpersist()
Lösen Sie die Persistenz von rdd. Wird beim Ändern der Persistenzstufe verwendet.
>>> from pyspark import StorageLevel
>>> rdd.persist()
>>> rdd.unpersist()
>>> rdd.persist(StorageLevel.DISK_ONLY)
Wird jederzeit hinzugefügt
word count
>>> rdd.flatMap(lambda x: x.split())\
.map(lambda x: (x, 1))\
.reduceByKey(lambda x, y: x + y)\
.take(5)
DataFrame Dies ist besonders praktisch, wenn Sie mit strukturierten Daten arbeiten.
read.json
read.json(file)Lesen Sie Daten von json
```py
# $ cat a.json
# {"name":"Ken", "age":35}
# {"name":"Bob", "age":30, "weight":80}
# {"name":"Meg", "age":29, "weight":45}
df = sqlContext.read.json("a.json")
Es gibt `show``` zusätzlich zu`
collect```, `take
show
show(n)
N Zeilen anzeigen (n ist standardmäßig 20)
>>> df.show()
+---+----+------+
|age|name|weight|
+---+----+------+
| 35| Ken| null|
| 30| Bob| 80|
| 29| Meg| 45|
+---+----+------+
select
select(column)
Gibt den ausgewählten Datenrahmen zurück und übergibt eine Zeichenfolge oder ein Spaltenobjekt. Sie können auch Spalten auflisten, um mehrere Spalten abzurufen oder Berechnungen durchzuführen.
>>> df.select("age").show()
+---+
|age|
+---+
| 35|
| 30|
| 29|
+---+
#Gleiches gilt für den nächsten
>>> df.select(df.age).show() #Übergeben Sie ein Spaltenobjekt
>>> df.select(df["age"]).show() #Übergeben Sie ein Spaltenobjekt
>>> df.select(df.name, df.age).show()
+----+---+
|name|age|
+----+---+
| Ken| 35|
| Bob| 30|
| Meg| 29|
+----+---+
In Python gibt es zwei Muster für den Zugriff auf das von select übergebene Column-Objekt:
>>> df.age
Column<age>
>>> df["age"]
Column<age>
where/filter
filter(condition)
Gibt einen Datenrahmen zurück, der nur aus Zeilen besteht, die die Zeichenfolgenkriterien erfüllen.where
Istfilter
Es ist ein Pseudonym von.
>>> df.where(df.age >=30).show()
+---+----+------+
|age|name|weight|
+---+----+------+
| 35| Ken| null|
| 30| Bob| 80|
+---+----+------+
sort
sort(column)
Gibt einen Datenrahmen zurück, der nach der angegebenen Spalte sortiert ist
>>> df.sort(df.age)
+---+----+------+
|age|name|weight|
+---+----+------+
| 29| Meg| 45|
| 30| Bob| 80|
| 35| Ken| null|
+---+----+------+
limit
limit(n)
Gibt einen Datenrahmen zurück, der nur auf die ersten n Zeilen beschränkt ist
>>> df.limit(1).show()
+---+----+------+
|age|name|weight|
+---+----+------+
| 35| Ken| null|
+---+----+------+
distinct
distinct()
Gibt einen Datenrahmen zurück, der nur aus den unterschiedlichen Ergebniszeilen besteht
>>> df.distinct().count()
3
join
join(dataframe, on, how)
wie standard ist inner
--on: Spalte oder Liste der Spalten
--how: `" inner "`
, "äußere"
,
" left_outer "
,
"right_outer" `,` `" leftsemi "` Einer von
Da der DataFrame auf dem RDD basiert, kann das ursprüngliche RDD abgerufen werden.
>>> print(df.rdd.collect())
[Row(age=35, name=u'Ken', weight=None),
Row(age=30, name=u'Bob', weight=80),
Row(age=29, name=u'Meg', weight=45)]
Um nur eine bestimmte Spalte abzurufen, greifen Sie auf das entsprechende Attribut des `` `Row```-Objekts zu
df.rdd.map(lambda row: (row.age, row.weight)).collect()
[(35, None), (30, 80), (29, 45)]
toJson
tojson()
Konvertieren Sie in rdd in Form von json. danachsaveastextfile
Sie können es im JSON-Format speichern, indem Sie aufrufen.
>>> df.toJSON().saveAsTextFile("b.json")
>>> df2 = sqlContext.read.json("/b.json")
+---+----+------+
|age|name|weight|
+---+----+------+
| 35| Ken| null|
| 30| Bob| 80|
| 29| Meg| 45|
+---+----+------+
Spark Streaming und Mllib verwandte Elemente können hier hinzugefügt werden.
Recommended Posts