Früher habe ich Spark verwendet, jetzt verwende ich es nicht mehr Ich werde es bald vergessen, also habe ich beschlossen, die Grundlagen zu notieren.
(Da es sich um das Wissen handelt, das Ganze anzuhören, erwarte ich Kommentare und bearbeite Fehleranfragen.)
Es wird ein Docker-Image bereitgestellt, auf dem eine Jupyter + PySpark-Umgebung ausgeführt wird. Dies ist praktisch, um es vor Ort zu versuchen: https://hub.docker.com/r/jupyter/pyspark-notebook/
Was ist PySpark? Spark selbst ist Scala, Es gibt eine Geschichte, dass es einen Typen gibt, der mit Python verwendet werden kann, und das ist PySpark.
Es hätte ein Mechanismus sein sollen, um mein Bestes mit IPC zu geben Scala <-> Es gibt auch ein Thema, bei dem die Kosten für die Konvertierung von Python recht hoch sind.
Jetzt verwenden wir:
docker run -it -p 8888:8888 jupyter/pyspark-notebook
Wenn Sie dies tun, wird im Terminal eine URL mit einem Token unter der Nummer 8888 angezeigt.
(Um Um auf das Notebook zuzugreifen, ...
)
Wenn Sie auf die Hauptseite zugreifen, wird die Jupyter-Seite angezeigt.
Sie haben eine einfache Umgebung, die Sie mit Jupyter Notebook codieren können.
Wählen Sie hier "Notebook: Python3" aus "New", um das Notebook zu öffnen
Der Testcode, um festzustellen, ob er funktioniert, stammt aus dem folgenden Beispiel (https://jupyter-docker-stacks.readthedocs.io/en/latest/using/specifics.html#in-a-python-notebook):
from pyspark.sql import SparkSession
spark: SparkSession = SparkSession.builder.appName("SimpleApp").getOrCreate()
# do something to prove it works
spark.sql('SELECT "Test" as c1').show()
Ich bin mir bei der "Spark Session" nicht sicher, aber Es ist eine Erkenntnis, dass es wie eine Instanz von Spark selbst ist.
Wenn Sie dies tun und eine Tabelle erhalten, ist dies in Ordnung:
Lassen Sie uns auf diese Art von Daten abzielen:
id |
name |
gender |
age |
---|---|---|---|
1 | Satoshi | male | 10 |
2 | Shigeru | male | 10 |
3 | Kasumi | female | 12 |
Hier ist eine naive Definition von Daten in Python:
from typing import List, Tuple
Trainer = Tuple[int, str, str, int]
trainers: List[Trainer] = [
(1, 'Satoshi', 'male', 10),
(2, 'Shigeru', 'male', 10),
(3, 'Kasumi', 'female', 12),
]
Der Typ jeder Zeile ist "Tuple [int, str, str, int]" in Pythons "Typisierung".
Und Spark hat auch eine Schemadefinition:
from pyspark.sql.types import StructField, StructType, StringType, IntegerType
trainers_schema = StructType([
StructField('id', IntegerType(), True),
StructField('name', StringType(), True),
StructField('gender', StringType(), True),
StructField('age', IntegerType(), True),
])
Jetzt können Sie das Spaltenschema auf der Spark-Seite definieren.
So konvertieren Sie Python-definierte Daten in den "DataFrame" von Spark:
from pyspark.sql import DataFrame
trainers_df: DataFrame = spark.createDataFrame(
spark.sparkContext.parallelize(trainers),
trainers_schema
)
Sie haben jetzt einen "DataFrame" namens "trainers_df".
Da es aus CSV, MySQL usw. als Datenquelle gelesen werden kann, In der Realität wird es aus einer solchen Datenquelle gelesen und nicht im Code definiert. (In einigen Fällen muss JDBC oder Hadoop festgelegt werden, was später beschrieben wird.)
Wenn Sie dies ablegen und sehen möchten:
trainers_df.show()
Dadurch werden einige Zeilen formatierten Textes in der Tabelle ausgegeben:
+---+------+------+---+
| id| name|gender|age|
+---+------+------+---+
| 1|Satoshi| male| 10|
| 2|Shigeru| male| 10|
| 3|Kasumi|female| 12|
+---+------+------+---+
Um den Wert anstelle des Speicherauszugs zu erhalten, führen Sie einfach .collect ()
aus:
result = trainers_df.collect()
print(result)
Exportieren Sie beim Exportieren in CSV "DataFrame" in dieser Atmosphäre:
trainers_df.coalesce(1).write.mode('overwrite').csv("path/to/output.csv")
Wie bei der Eingabe gibt es verschiedene andere Ausgabeziele wie S3, MySQL und Elasticsearch.
.coalesce (1)
sind die Daten, die für jede Partition aufgeteilt werden.
Es soll zu einer Partition verschmelzen.
Wenn Sie dies nicht tun, wird die CSV beim Teilen ausgegeben.
Verwenden des Befehls hdfs
von Hadoop
Es gibt auch eine Möglichkeit, die geteilten zusammenzubringen.
Es ist im Grunde eine Verzögerungsbewertung Da es nur durch Ausführen einer Operation wie ".collect ()" ausgewertet wird Sie sollten nicht so oft aggregieren.
Dies allein macht keinen Sinn, nur um es anzuzeigen. Lassen Sie uns also etwas Passendes tun:
trainers_df.createOrReplaceTempView('trainers');
male_trainers_df = spark.sql('''
SELECT *
FROM trainers
WHERE gender = 'male'
''')
male_trainers_df.show()
Dies ergibt dieses Ergebnis:
id |
name |
gender |
age |
---|---|---|---|
1 | Satoshi | male | 10 |
2 | Shigeru | male | 10 |
DataFrame.createOrReplaceTempView (name)
ist DataFrame
,
Es kann als temporäre SQL-Ansicht registriert werden.
Jetzt können Sie den DF des Ergebnisses der SQL-Operation für die mit "spark.sql (query)" registrierte Ansicht abrufen. Auf diese Weise können Sie Spark ohne zu zögern mit dem gewohnten SQL verwenden. Die Magie ist, dass sowohl psychologische Barrieren als auch Lernkosten niedrig sind.
Sie können den Code auch als "DataFrame" schreiben, ohne ihn in View zu registrieren:
male_trainers_df = trainers_df.filter(trainers_df['gender'] == 'male')
Es gibt Fälle, in denen dies einfacher zu verwenden ist, daher von Fall zu Fall.
Da Sie SQL verwenden können, gibt es kein Problem mit grundlegenden Operationen. Wenn Sie Spark verwenden möchten, möchten Sie meistens einige benutzerdefinierte Vorgänge ausführen.
Zum Beispiel wollte ich das in der Vergangenheit tun Es gibt eine Sache namens "morphologische Analyse des Artikeltextes und separates Schreiben". Dies ist mit SQL allein nur schwer zu erreichen.
Da es jedoch MeCab auf Python gibt, Wenn Sie eine morphologische Analyse mit der MeCab-Bibliothek durchführen, wird diese ohne nachzudenken zerlegt Auch wenn Sie wie ich überhaupt nicht verstehen, können Sie es vorerst einfach auf MeCab werfen.
Wie kann ich das für DataFrame
auf Spark machen?
Es ist gut, UDF (User-Defined Function) zu definieren.
(* Es gibt eine Technik, mit der Sie "Lambda" direkt auf RDD anstelle von "DataFrame" anwenden können. Dies hat eine schlechte Leistung).
Nehmen Sie zum Definieren einer UDF die folgende Definition vor:
from pyspark.sql.functions import udf
@udf(StringType())
def name_with_suffix(name: str, gender: str) -> str:
return name + {'male': 'Kun', 'female': 'Herr.'}.get(gender, 'Herr')
spark.udf.register('name_with_suffix', name_with_suffix)
Durch Anwenden des Dekorators "@udf (ReturnType)" auf die Funktion, die zur UDF wird Die Funktion kann jetzt als UDF definiert werden. Um es mit Spark SQL zu verwenden, registrieren Sie es mit "spark.udf.register (udf_name, udf)" Sie können es so verwenden, wie es ist, für den gleichen Zweck wie COUNT ().
Übrigens können Sie die vorhandene Funktion mit udf_fn = udf (fn)
anwenden, ohne den Dekorator zu verwenden.
Das in diesem Beispiel angegebene hängt vom Geschlecht ab. Es wird ein Suffix hinzugefügt, das "Geschlecht" zu "Name" entspricht. Wenden wir diese Funktion als UDF an:
dearest_trainers = spark.sql('''
SELECT name_with_suffix(name, gender)
FROM trainers
''')
dearest_trainers.show()
Das Ergebnis ist:
name_with_suffix(name, gender) |
---|
Satoshi-Kun |
Shigeru |
Kasumi |
In diesem Beispiel gibt es eine Meinung, dass Sie sogar in SQL mit "CASE" schreiben können, aber das ist richtig.
Dies kann nützlich sein, je nachdem, was Sie tun möchten.
UDF
Die oben erwähnte morphologische Analyse wird übrigens durchgeführt und aufgeteilt. Dies wäre eine solche Funktion als Bild (Eigentlich benutze ich MeCab cool):
import re
#Halbe Grösse/Teilen Sie die Zeichenfolge durch Leerzeichen und Kontrakte in voller Breite
@udf(ArrayType(StringType()))
def wakachi(text: str) -> List[str]:
return [
word
for word
in re.split('[ !…]+', text)
if len(word) > 0
]
Es ist in Ordnung, dies so anzuwenden, wie es ist. Schreiben wir den Beispielcode erneut, während wir die Daten ändern:
Trainer = Tuple[int, str, str, int, str]
trainers: List[Trainer] = [
(1, 'Satoshi', 'male', 10, 'Holen Sie sich Pokemon'),
(2, 'Shigeru', 'male', 10, 'Das ist das Beste von allem! Es bedeutet stark zu sein!'),
(3, 'Kasumi', 'female', 12, 'Meine Politik ist ... zumindest bei Pokémon vom Typ Mizu ... zumindest!'),
]
trainers_schema = StructType([
StructField('id', IntegerType(), True),
StructField('name', StringType(), True),
StructField('gender', StringType(), True),
StructField('age', IntegerType(), True),
])
trainers_df = spark.createDataFrame(
spark.sparkContext.parallelize(trainers),
trainers_schema
)
trainers_df.createOrReplaceTempView('trainers');
wakachi_trainers_df = spark.sql('''
SELECT id, name, wakachi(comment)
FROM trainers
''')
wakachi_trainers_df.show()
Der Punkt hier ist Diesmal empfängt UDF "str" und erweitert es als "List [str]". Wenn ich das versuche, sieht es so aus:
id |
name |
wakachi(comment) |
---|---|---|
1 | Satoshi | [Pokémon,erhalten,Korrekt] |
2 | Shigeru | [Das ich,In der Welt,ich... |
3 | Kasumi | [Meine,Politik,Mizu... |
Die erweiterten Zellen befinden sich in einer Liste Es befindet sich in einem verschachtelten Zustand mit mehr Zeilen innerhalb der Zeilen.
Was ist, wenn Sie dies als Spalte für jedes "str" erweitern möchten? Sie können weitere Funktionen zum Erweitern anwenden:
https://spark.apache.org/docs/2.0.2/api/java/org/apache/spark/sql/functions.html#explode(org.apache.spark.sql.Column)
from pyspark.sql.functions import explode
wakachi_trainers_df = spark.sql('''
SELECT id, name, explode(wakachi(comment))
FROM trainers
''')
wakachi_trainers_df.show()
Da gibt es eine Funktion namens "explodieren" Wenn Sie dies anwenden, werden die verschachtelten Elemente als ihre jeweiligen Spalten erweitert:
id |
name |
col |
---|---|---|
1 | Satoshi | Pokémon |
1 | Satoshi | erhalten |
1 | Satoshi | Korrekt |
2 | Shigeru | Das ich |
2 | Shigeru | In der Welt |
2 | Shigeru | die meisten |
2 | Shigeru | Stark |
2 | Shigeru | Das ist es |
3 | Kasumi | Meine |
3 | Kasumi | Politik |
3 | Kasumi | Mizu |
3 | Kasumi | Art |
3 | Kasumi | Mit Pokemon |
3 | Kasumi | wenigstens |
3 | Kasumi | wenigstens |
3 | Kasumi | Spree |
3 | Kasumi | Das |
Als weiteren Punkt können Sie einen "JOIN" zwischen "DataFrame" erstellen. Geben Sie die Spalte an, die auf die gleiche Weise wie "JOIN" zum Verbinden verwendet werden soll, z. B. "MySQL". Basierend darauf wird "DataFrame" kombiniert.
Fügen wir weiteren Beispielcode hinzu und verwenden Sie "JOIN":
Pkmn = Tuple[int, int, str, int]
pkmns: List[Pkmn] = [
(1, 1, 'Pikachu', 99),
(2, 1, 'Eidechse', 99),
(3, 2, 'Evey', 50),
(4, 3, 'Tosakinto', 20),
(5, 3, 'Seestern', 30),
(6, 3, 'Star Me', 40),
]
pkmns_schema = StructType([
StructField('id', IntegerType(), True),
StructField('trainer_id', IntegerType(), True),
StructField('name', StringType(), True),
StructField('level', IntegerType(), True),
])
pkmns_df = spark.createDataFrame(
spark.sparkContext.parallelize(pkmns),
pkmns_schema
)
pkmns_df.createOrReplaceTempView('pkmns');
trainer_and_pkmns_df = spark.sql('''
SELECT *
FROM trainers
INNER JOIN pkmns
ON trainers.id = pkmns.trainer_id
''')
trainer_and_pkmns_df.show()
id |
name |
gender |
age |
comment |
id |
trainer_id |
name |
level |
---|---|---|---|---|---|---|---|---|
1 | Satoshi | male | 10 | Holen Sie sich Pokemon | 1 | 1 | Pikachu | 99 |
1 | Satoshi | male | 10 | Holen Sie sich Pokemon | 2 | 1 | Eidechse | 99 |
3 | Kasumi | female | 12 | Meine Politik ist ... Mizuta... | 4 | 3 | Tosakinto | 20 |
3 | Kasumi | female | 12 | Meine Politik ist ... Mizuta... | 5 | 3 | Seestern | 30 |
3 | Kasumi | female | 12 | Meine Politik ist ... Mizuta... | 6 | 3 | Star Me | 40 |
2 | Shigeru | male | 10 | Ich bin der Beste... | 3 | 2 | Evey | 50 |
Übrigens gibt es viele andere Typen als "INNER JOIN" und "OUTER JOIN". Dieser Artikel ist leicht zu verstehen, daher werde ich ihn zitieren:
https://qiita.com/ryoutoku/items/70c35cb016dcb13c8740
Dies ist praktisch, da Sie damit kollektive Operationen ausführen können.
Das Konzept jedes "JOIN" wird zitiert, da das Ben-Diagramm auf dieser Seite leicht zu verstehen ist:
https://medium.com/@achilleus/https-medium-com-joins-in-apache-spark-part-1-dabbf3475690
Als Punkt ist "JOIN" immer noch teuer und langsam. Wenn Sie einen Cluster gebildet haben, werden anscheinend Vorgänge ausgeführt, z. B. das Auffinden anhand der an verschiedenen Orten verteilten Daten, "JOIN" und das Zurückgeben.
Daher ist eine Leistungsoptimierung erforderlich, die später beschrieben wird.
In der realen Welt kann das Ringen mit riesigen Datenmengen eine entmutigende Aufgabe sein. Denn wenn es ungefähr 4 Stunden dauert und gegen Ende fällt, muss es erneut wiederholt werden. Wenn Sie zweimal einen Fehler machen, haben Sie die Geschäftszeiten eines Tages aufgewendet und Überstunden werden bestätigt.
Um diese Leistung zu verbessern, haben wir die Daten reduziert, um die Effizienz von "JOIN" zu erhöhen. Ändern Sie die Partitionierungsmethode. Es muss so konzipiert werden, dass die Partitionen im Cluster nicht so weit wie möglich fragmentiert werden.
Broadcast Join ist eine Methode, um mutig doppelte Datensätze in allen Clustern zu platzieren.
Es gibt auch Dinge wie die Senkung der Kosten für die Suche in einem Datensatz bei JOIN
.
Als wichtige Technik Indem Sie DataFrame an jedem Prüfpunkt auf ".cache ()" setzen, In einigen Fällen wird die Leistung erheblich verbessert.
Wenn Sie sich die offizielle Seite über Leistung ansehen, gibt es eine solche Technik, die hilfreich sein wird:
https://spark.apache.org/docs/latest/sql-performance-tuning.html#broadcast-hint-for-sql-queries
MySQL
Nun, es ist üblich, aus einer MySQL-Datenbank lesen und diese loswerden zu wollen. In diesem Fall benötigen Sie einen JDBC-MySQL-Connector, um mit MySQL arbeiten zu können. Der Eintrag dieser Person und ihr Docker-Bild sind hilfreich:
Es gibt jedoch einige Dinge, die MySQL mit Spark nur schwer zu handhaben ist. (Es gibt verschiedene Suchtpunkte)
Spark ist mächtig:
Ich denke, das ist.
Außerdem ist Spark der Schlüssel zum Erstellen von Clustern mit mehreren Einheiten und zum Erledigen der Arbeit durch Mitarbeiter. In Wirklichkeit scheint es besser zu sein, Amazon EMR oder AWS Glue zu verwenden, da es AWS überlassen bleibt. Dies liegt daran, dass es, wenn es lokal ist, ohne Erstellen eines Clusters funktioniert. Selbst wenn Sie eine große Menge seriöser Daten eingeben, wird keine Leistung erbracht und Sie werden nicht davon profitieren.
Sie stoßen an die Speichergrenze, Selbst wenn Sie Geld sparen können, dauert es zwei Wochen, bis der gesamte Prozess stapelweise durchläuft, wenn es sich um große Datenmengen handelt. Selbst einfache Dinge können möglich sein, wenn Sie sie selbst aufteilen und in mehrere Prozesse aufteilen und ausführen. Es ist eine gute Idee, es Spark zu überlassen, wenn es kann.
Recommended Posts