[PYTHON] Probieren Sie Apache Spark mit Jupyter Notebook (auf Local Docker) aus

Früher habe ich Spark verwendet, jetzt verwende ich es nicht mehr Ich werde es bald vergessen, also habe ich beschlossen, die Grundlagen zu notieren.

(Da es sich um das Wissen handelt, das Ganze anzuhören, erwarte ich Kommentare und bearbeite Fehleranfragen.)

verwenden

Es wird ein Docker-Image bereitgestellt, auf dem eine Jupyter + PySpark-Umgebung ausgeführt wird. Dies ist praktisch, um es vor Ort zu versuchen: https://hub.docker.com/r/jupyter/pyspark-notebook/

Was ist PySpark? Spark selbst ist Scala, Es gibt eine Geschichte, dass es einen Typen gibt, der mit Python verwendet werden kann, und das ist PySpark.

Es hätte ein Mechanismus sein sollen, um mein Bestes mit IPC zu geben Scala <-> Es gibt auch ein Thema, bei dem die Kosten für die Konvertierung von Python recht hoch sind.

Jetzt verwenden wir:

docker run -it -p 8888:8888 jupyter/pyspark-notebook

Wenn Sie dies tun, wird im Terminal eine URL mit einem Token unter der Nummer 8888 angezeigt. (Um Um auf das Notebook zuzugreifen, ...) Wenn Sie auf die Hauptseite zugreifen, wird die Jupyter-Seite angezeigt. Sie haben eine einfache Umgebung, die Sie mit Jupyter Notebook codieren können.

Home_Page_-_Select_or_create_a_notebook.png

Wählen Sie hier "Notebook: Python3" aus "New", um das Notebook zu öffnen

Untitled_-_Jupyter_Notebook.png

Versuchen

Der Testcode, um festzustellen, ob er funktioniert, stammt aus dem folgenden Beispiel (https://jupyter-docker-stacks.readthedocs.io/en/latest/using/specifics.html#in-a-python-notebook):

from pyspark.sql import SparkSession

spark: SparkSession = SparkSession.builder.appName("SimpleApp").getOrCreate()

# do something to prove it works
spark.sql('SELECT "Test" as c1').show()

Ich bin mir bei der "Spark Session" nicht sicher, aber Es ist eine Erkenntnis, dass es wie eine Instanz von Spark selbst ist.

Wenn Sie dies tun und eine Tabelle erhalten, ist dies in Ordnung:

Untitled_-_Jupyter_Notebook.png

Daten verarbeiten

Lassen Sie uns auf diese Art von Daten abzielen:

id name gender age
1 Satoshi male 10
2 Shigeru male 10
3 Kasumi female 12

Eingabe und Definition

Hier ist eine naive Definition von Daten in Python:

from typing import List, Tuple

Trainer = Tuple[int, str, str, int]
trainers: List[Trainer] = [
    (1, 'Satoshi', 'male',   10),
    (2, 'Shigeru', 'male',   10),
    (3, 'Kasumi', 'female', 12),
]

Der Typ jeder Zeile ist "Tuple [int, str, str, int]" in Pythons "Typisierung".

Und Spark hat auch eine Schemadefinition:

from pyspark.sql.types import StructField, StructType, StringType, IntegerType

trainers_schema = StructType([
    StructField('id',      IntegerType(), True),
    StructField('name',    StringType(),  True),
    StructField('gender',  StringType(),  True),
    StructField('age',     IntegerType(), True),
])

Jetzt können Sie das Spaltenschema auf der Spark-Seite definieren.

So konvertieren Sie Python-definierte Daten in den "DataFrame" von Spark:

from pyspark.sql import DataFrame

trainers_df: DataFrame = spark.createDataFrame(
    spark.sparkContext.parallelize(trainers),
    trainers_schema
)

Sie haben jetzt einen "DataFrame" namens "trainers_df".

Da es aus CSV, MySQL usw. als Datenquelle gelesen werden kann, In der Realität wird es aus einer solchen Datenquelle gelesen und nicht im Code definiert. (In einigen Fällen muss JDBC oder Hadoop festgelegt werden, was später beschrieben wird.)

Wenn Sie dies ablegen und sehen möchten:

trainers_df.show()

Dadurch werden einige Zeilen formatierten Textes in der Tabelle ausgegeben:

Untitled_-_Jupyter_Notebook.png

+---+------+------+---+
| id|  name|gender|age|
+---+------+------+---+
|  1|Satoshi|  male| 10|
|  2|Shigeru|  male| 10|
|  3|Kasumi|female| 12|
+---+------+------+---+

Aggregation und Ausgabe

Um den Wert anstelle des Speicherauszugs zu erhalten, führen Sie einfach .collect () aus:

result = trainers_df.collect()
print(result)

Exportieren Sie beim Exportieren in CSV "DataFrame" in dieser Atmosphäre:

trainers_df.coalesce(1).write.mode('overwrite').csv("path/to/output.csv")

Wie bei der Eingabe gibt es verschiedene andere Ausgabeziele wie S3, MySQL und Elasticsearch.

.coalesce (1) sind die Daten, die für jede Partition aufgeteilt werden. Es soll zu einer Partition verschmelzen. Wenn Sie dies nicht tun, wird die CSV beim Teilen ausgegeben.

Verwenden des Befehls hdfs von Hadoop Es gibt auch eine Möglichkeit, die geteilten zusammenzubringen.

Es ist im Grunde eine Verzögerungsbewertung Da es nur durch Ausführen einer Operation wie ".collect ()" ausgewertet wird Sie sollten nicht so oft aggregieren.

Basic

Dies allein macht keinen Sinn, nur um es anzuzeigen. Lassen Sie uns also etwas Passendes tun:

trainers_df.createOrReplaceTempView('trainers');

male_trainers_df = spark.sql('''
    SELECT *
    FROM   trainers
    WHERE  gender = 'male'
''')
male_trainers_df.show()

Dies ergibt dieses Ergebnis:

id name gender age
1 Satoshi male 10
2 Shigeru male 10

DataFrame.createOrReplaceTempView (name) ist DataFrame, Es kann als temporäre SQL-Ansicht registriert werden.

Jetzt können Sie den DF des Ergebnisses der SQL-Operation für die mit "spark.sql (query)" registrierte Ansicht abrufen. Auf diese Weise können Sie Spark ohne zu zögern mit dem gewohnten SQL verwenden. Die Magie ist, dass sowohl psychologische Barrieren als auch Lernkosten niedrig sind.

Sie können den Code auch als "DataFrame" schreiben, ohne ihn in View zu registrieren:

male_trainers_df = trainers_df.filter(trainers_df['gender'] == 'male')

Es gibt Fälle, in denen dies einfacher zu verwenden ist, daher von Fall zu Fall.

Anwendung

Da Sie SQL verwenden können, gibt es kein Problem mit grundlegenden Operationen. Wenn Sie Spark verwenden möchten, möchten Sie meistens einige benutzerdefinierte Vorgänge ausführen.

Zum Beispiel wollte ich das in der Vergangenheit tun Es gibt eine Sache namens "morphologische Analyse des Artikeltextes und separates Schreiben". Dies ist mit SQL allein nur schwer zu erreichen.

Da es jedoch MeCab auf Python gibt, Wenn Sie eine morphologische Analyse mit der MeCab-Bibliothek durchführen, wird diese ohne nachzudenken zerlegt Auch wenn Sie wie ich überhaupt nicht verstehen, können Sie es vorerst einfach auf MeCab werfen.

Wie kann ich das für DataFrame auf Spark machen? Es ist gut, UDF (User-Defined Function) zu definieren.

(* Es gibt eine Technik, mit der Sie "Lambda" direkt auf RDD anstelle von "DataFrame" anwenden können. Dies hat eine schlechte Leistung).

Nehmen Sie zum Definieren einer UDF die folgende Definition vor:

from pyspark.sql.functions import udf

@udf(StringType())
def name_with_suffix(name: str, gender: str) -> str:
    return name + {'male': 'Kun', 'female': 'Herr.'}.get(gender, 'Herr')

spark.udf.register('name_with_suffix', name_with_suffix)

Durch Anwenden des Dekorators "@udf (ReturnType)" auf die Funktion, die zur UDF wird Die Funktion kann jetzt als UDF definiert werden. Um es mit Spark SQL zu verwenden, registrieren Sie es mit "spark.udf.register (udf_name, udf)" Sie können es so verwenden, wie es ist, für den gleichen Zweck wie COUNT ().

Übrigens können Sie die vorhandene Funktion mit udf_fn = udf (fn) anwenden, ohne den Dekorator zu verwenden.

Das in diesem Beispiel angegebene hängt vom Geschlecht ab. Es wird ein Suffix hinzugefügt, das "Geschlecht" zu "Name" entspricht. Wenden wir diese Funktion als UDF an:

dearest_trainers = spark.sql('''
    SELECT name_with_suffix(name, gender)
    FROM   trainers
''')
dearest_trainers.show()

Das Ergebnis ist:

name_with_suffix(name, gender)
Satoshi-Kun
Shigeru
Kasumi

In diesem Beispiel gibt es eine Meinung, dass Sie sogar in SQL mit "CASE" schreiben können, aber das ist richtig.

Dies kann nützlich sein, je nachdem, was Sie tun möchten.

UDF

Die oben erwähnte morphologische Analyse wird übrigens durchgeführt und aufgeteilt. Dies wäre eine solche Funktion als Bild (Eigentlich benutze ich MeCab cool):

import re

#Halbe Grösse/Teilen Sie die Zeichenfolge durch Leerzeichen und Kontrakte in voller Breite
@udf(ArrayType(StringType()))
def wakachi(text: str) -> List[str]:
    return [
        word
        for word
        in re.split('[  !…]+', text)
        if len(word) > 0
    ]

Es ist in Ordnung, dies so anzuwenden, wie es ist. Schreiben wir den Beispielcode erneut, während wir die Daten ändern:

Trainer = Tuple[int, str, str, int, str]
trainers: List[Trainer] = [
    (1, 'Satoshi', 'male',   10, 'Holen Sie sich Pokemon'),
    (2, 'Shigeru', 'male',   10, 'Das ist das Beste von allem! Es bedeutet stark zu sein!'),
    (3, 'Kasumi', 'female', 12, 'Meine Politik ist ... zumindest bei Pokémon vom Typ Mizu ... zumindest!'),
]

trainers_schema = StructType([
    StructField('id',      IntegerType(), True),
    StructField('name',    StringType(),  True),
    StructField('gender',  StringType(),  True),
    StructField('age',     IntegerType(), True),
])

trainers_df = spark.createDataFrame(
    spark.sparkContext.parallelize(trainers),
    trainers_schema
)
trainers_df.createOrReplaceTempView('trainers');

wakachi_trainers_df = spark.sql('''
    SELECT id, name, wakachi(comment)
    FROM   trainers
''')
wakachi_trainers_df.show()

Der Punkt hier ist Diesmal empfängt UDF "str" und erweitert es als "List [str]". Wenn ich das versuche, sieht es so aus:

id name wakachi(comment)
1 Satoshi [Pokémon,erhalten,Korrekt]
2 Shigeru [Das ich,In der Welt,ich...
3 Kasumi [Meine,Politik,Mizu...

Die erweiterten Zellen befinden sich in einer Liste Es befindet sich in einem verschachtelten Zustand mit mehr Zeilen innerhalb der Zeilen.

Was ist, wenn Sie dies als Spalte für jedes "str" erweitern möchten? Sie können weitere Funktionen zum Erweitern anwenden:

https://spark.apache.org/docs/2.0.2/api/java/org/apache/spark/sql/functions.html#explode(org.apache.spark.sql.Column)

from pyspark.sql.functions import explode

wakachi_trainers_df = spark.sql('''
    SELECT id, name, explode(wakachi(comment))
    FROM   trainers
''')
wakachi_trainers_df.show()

Da gibt es eine Funktion namens "explodieren" Wenn Sie dies anwenden, werden die verschachtelten Elemente als ihre jeweiligen Spalten erweitert:

id name col
1 Satoshi Pokémon
1 Satoshi erhalten
1 Satoshi Korrekt
2 Shigeru Das ich
2 Shigeru In der Welt
2 Shigeru die meisten
2 Shigeru Stark
2 Shigeru Das ist es
3 Kasumi Meine
3 Kasumi Politik
3 Kasumi Mizu
3 Kasumi Art
3 Kasumi Mit Pokemon
3 Kasumi wenigstens
3 Kasumi wenigstens
3 Kasumi Spree
3 Kasumi Das

beitreten

Als weiteren Punkt können Sie einen "JOIN" zwischen "DataFrame" erstellen. Geben Sie die Spalte an, die auf die gleiche Weise wie "JOIN" zum Verbinden verwendet werden soll, z. B. "MySQL". Basierend darauf wird "DataFrame" kombiniert.

Fügen wir weiteren Beispielcode hinzu und verwenden Sie "JOIN":

Pkmn = Tuple[int, int, str, int]
pkmns: List[Pkmn] = [
    (1, 1, 'Pikachu', 99),
    (2, 1, 'Eidechse', 99),
    (3, 2, 'Evey',   50),
    (4, 3, 'Tosakinto', 20),
    (5, 3, 'Seestern', 30),
    (6, 3, 'Star Me', 40),
]
pkmns_schema = StructType([
    StructField('id',         IntegerType(), True),
    StructField('trainer_id', IntegerType(), True),
    StructField('name',       StringType(),  True),
    StructField('level',      IntegerType(), True),
])
pkmns_df = spark.createDataFrame(
    spark.sparkContext.parallelize(pkmns),
    pkmns_schema
)
pkmns_df.createOrReplaceTempView('pkmns');

trainer_and_pkmns_df = spark.sql('''
    SELECT     *
    FROM       trainers
    INNER JOIN pkmns
          ON   trainers.id = pkmns.trainer_id
''')
trainer_and_pkmns_df.show()
id name gender age comment id trainer_id name level
1 Satoshi male 10 Holen Sie sich Pokemon 1 1 Pikachu 99
1 Satoshi male 10 Holen Sie sich Pokemon 2 1 Eidechse 99
3 Kasumi female 12 Meine Politik ist ... Mizuta... 4 3 Tosakinto 20
3 Kasumi female 12 Meine Politik ist ... Mizuta... 5 3 Seestern 30
3 Kasumi female 12 Meine Politik ist ... Mizuta... 6 3 Star Me 40
2 Shigeru male 10 Ich bin der Beste... 3 2 Evey 50

Übrigens gibt es viele andere Typen als "INNER JOIN" und "OUTER JOIN". Dieser Artikel ist leicht zu verstehen, daher werde ich ihn zitieren:

https://qiita.com/ryoutoku/items/70c35cb016dcb13c8740

Dies ist praktisch, da Sie damit kollektive Operationen ausführen können.

Das Konzept jedes "JOIN" wird zitiert, da das Ben-Diagramm auf dieser Seite leicht zu verstehen ist:

https://medium.com/@achilleus/https-medium-com-joins-in-apache-spark-part-1-dabbf3475690

Als Punkt ist "JOIN" immer noch teuer und langsam. Wenn Sie einen Cluster gebildet haben, werden anscheinend Vorgänge ausgeführt, z. B. das Auffinden anhand der an verschiedenen Orten verteilten Daten, "JOIN" und das Zurückgeben.

Daher ist eine Leistungsoptimierung erforderlich, die später beschrieben wird.

Performance

In der realen Welt kann das Ringen mit riesigen Datenmengen eine entmutigende Aufgabe sein. Denn wenn es ungefähr 4 Stunden dauert und gegen Ende fällt, muss es erneut wiederholt werden. Wenn Sie zweimal einen Fehler machen, haben Sie die Geschäftszeiten eines Tages aufgewendet und Überstunden werden bestätigt.

Um diese Leistung zu verbessern, haben wir die Daten reduziert, um die Effizienz von "JOIN" zu erhöhen. Ändern Sie die Partitionierungsmethode. Es muss so konzipiert werden, dass die Partitionen im Cluster nicht so weit wie möglich fragmentiert werden.

Broadcast Join ist eine Methode, um mutig doppelte Datensätze in allen Clustern zu platzieren. Es gibt auch Dinge wie die Senkung der Kosten für die Suche in einem Datensatz bei JOIN.

Als wichtige Technik Indem Sie DataFrame an jedem Prüfpunkt auf ".cache ()" setzen, In einigen Fällen wird die Leistung erheblich verbessert.

Wenn Sie sich die offizielle Seite über Leistung ansehen, gibt es eine solche Technik, die hilfreich sein wird:

https://spark.apache.org/docs/latest/sql-performance-tuning.html#broadcast-hint-for-sql-queries

MySQL

Nun, es ist üblich, aus einer MySQL-Datenbank lesen und diese loswerden zu wollen. In diesem Fall benötigen Sie einen JDBC-MySQL-Connector, um mit MySQL arbeiten zu können. Der Eintrag dieser Person und ihr Docker-Bild sind hilfreich:

Es gibt jedoch einige Dinge, die MySQL mit Spark nur schwer zu handhaben ist. (Es gibt verschiedene Suchtpunkte)

Tatsächlich

Spark ist mächtig:

Ich denke, das ist.

Außerdem ist Spark der Schlüssel zum Erstellen von Clustern mit mehreren Einheiten und zum Erledigen der Arbeit durch Mitarbeiter. In Wirklichkeit scheint es besser zu sein, Amazon EMR oder AWS Glue zu verwenden, da es AWS überlassen bleibt. Dies liegt daran, dass es, wenn es lokal ist, ohne Erstellen eines Clusters funktioniert. Selbst wenn Sie eine große Menge seriöser Daten eingeben, wird keine Leistung erbracht und Sie werden nicht davon profitieren.

Sie stoßen an die Speichergrenze, Selbst wenn Sie Geld sparen können, dauert es zwei Wochen, bis der gesamte Prozess stapelweise durchläuft, wenn es sich um große Datenmengen handelt. Selbst einfache Dinge können möglich sein, wenn Sie sie selbst aufteilen und in mehrere Prozesse aufteilen und ausführen. Es ist eine gute Idee, es Spark zu überlassen, wenn es kann.

Recommended Posts

Probieren Sie Apache Spark mit Jupyter Notebook (auf Local Docker) aus
Versuchen Sie, Jupyter Notebook auf einem Mac auszuführen
Verwenden von Apache Spark mit Jupyter Notebook (IPython Notebook)
Probieren Sie SVM mit scikit-learn auf Jupyter Notebook aus
Probieren Sie grundlegende Operationen mit Pandas DataFrame auf Jupyter Notebook aus
EC2-Bereitstellung mit Vagrant + Jupyter (IPython Notebook) auf Docker
Versuchen Sie, Jupyter Notebook dynamisch zu verwenden
Hohe Charts im Jupyter-Notizbuch
PDF auf Jupyter Notebook anzeigen
Führen Sie IPython Notebook auf Docker aus
Führen Sie Jupyter Notebook unter Windows aus
Tensorboard im Jupyter-Notebook auf Docker kann nicht angezeigt werden (gelöst)
Versuchen Sie, Tensorflow auf Docker + Anaconda auszuführen
Erste Schritte mit Docker Apache Hadoop
Starten Sie das Jupyter Notebook ~ Esper-Training
Löse verstümmelte japanische Zeichen in matplotlib von Jupyter Notebook auf Docker
Machen Sie Jupyter Notebook zu einem Dienst unter CentOS
Verwenden Sie BigQuery von Ihrem lokalen Jupyter Notebook
Klonen Sie das Github-Repository auf dem Jupyter-Notizbuch
PC-GPU-Prüfung am Jupyter-Notebook
Histogramm / Streudiagramm auf Jupyter Notebook anzeigen
Erstellen Sie ein Jupyter-Notebook auf einem Remote-Server (CentOS).
Verwenden Sie vim-Tastenkombinationen in Jupyter Notebook, das mit Docker gestartet wurde
Führen Sie Jupyter Notebook auf einem Remote-Server aus
Installieren Sie matplotlib und zeigen Sie das Diagramm in Jupyter Notebook an
Erstellen Sie eine LAMP-Umgebung auf Ihrem lokalen Docker
Probieren Sie das Zustandsraummodell aus (Jupyter Notebook + IR-Kernel).
[Jupyter Notebook / Lab] Drei Möglichkeiten zum Debuggen auf Jupyter [Pdb]
Erstellen einer Analyseumgebung mit Docker (Jupyter Notebook + PostgreSQL)
Aktivieren Sie Jupyter Notebook mit conda auf dem Remote-Server
Versuchen Sie, die virtuelle Umgebung von conda mit Jupyter Notebook zu verwenden
[Pythonocc] Ich habe versucht, CAD auf einem Jupyter-Notebook zu verwenden
Einfache Anzeige des Liniendiagramms auf dem Jupyter Notebook
Öffnen Sie das auf dem Server gestartete Jupyter-Notizbuch aus der Ferne
Versuchen Sie es mit dem Jupyter Notebook von Azure Machine Learning
Jupyter Notebook beginnt nicht mit Fisch auf dem Mac
Die Geschichte vom Starten des Jupyter-Notizbuchs von python2.x mit Docker (am Samstag und Sonntag zerquetscht)
Pykintone auf Docker
Jupyter Notizbuch Memo
Einführung in Jupyter Notebook
Leistungsstarkes Jupyter-Notizbuch
Golang mit Jupyter
Jupyter auf AWS
Jupyter Notebook Passwort
Aufbau einer virtuellen Umgebung mit Docker + Flask (Python) + Jupyter-Notebook
Starten Sie das Jupyter Notebook ganz einfach unter AWS und greifen Sie lokal zu
Führen Sie Tensorflow von Jupyter Notebook unter Bash unter Ubuntu unter Windows aus
[Windows] [Python3] Installieren Sie Python3 und Jupyter Notebook (ehemals Ipython Notebook) unter Windows
So zeigen Sie den Fortschrittsbalken auf dem Jupyter-Notizbuch an, um den Fortschritt anzuzeigen