Ich werde einen Spark-Cluster vorbereiten und einen Beispielcode schreiben. Ich denke, viele Leute verwenden Jupyter als Ausführungsumgebung für die Python-Datenanalyse und das maschinelle Lernen. Der Zweck der Spark-App in Apache Toree besteht darin, auch interaktiv von Jupyter aus zu schreiben. Sie können Jupyter auch mit Scalas REPL verwenden, das über Ihren Browser ausgeführt werden kann.
Spark
Erstellen Sie mit Docker Compose einen Spark-Cluster. Viele Spark Standalone Cluster-Images und docker-compose.yml sind auf Docker Hub und Git Hub verfügbar.
Ich habe einige ausprobiert, aber semantive / spark ist einfach und benutzerfreundlich.
Docker Compose
Die Verwendung des Semantive / Spark-Images wird unter Docker-Images für Apache Spark beschrieben. Docker Hub ist hier und Git Hub ist hier.
Ich habe einige Änderungen an docker-compose.yml im Repository vorgenommen. Die Hauptänderung besteht darin, die öffentliche IP-Adresse der virtuellen Maschine in der Cloud in den Umgebungsvariablen "SPARK_PUBLIC_DNS" und "SPARK_MASTER_HOST" anzugeben, die explizit das Image-Tag angeben, das der Spark-Version entspricht.
docker-compose.yml
version: '2'
services:
master:
image: semantive/spark:spark-2.1.1-hadoop-2.7.3
command: bin/spark-class org.apache.spark.deploy.master.Master -h master
hostname: master
environment:
MASTER: spark://master:7077
SPARK_CONF_DIR: /conf
SPARK_PUBLIC_DNS: <Öffentliche IP-Adresse der virtuellen Maschine>
SPARK_MASTER_HOST: <Öffentliche IP-Adresse der virtuellen Maschine>
ports:
- 4040:4040
- 6066:6066
- 7077:7077
- 8080:8080
volumes:
- spark_data:/tmp/data
worker1:
image: semantive/spark:spark-2.1.1-hadoop-2.7.3
command: bin/spark-class org.apache.spark.deploy.worker.Worker spark://master:7077
hostname: worker1
environment:
SPARK_CONF_DIR: /conf
SPARK_WORKER_CORES: 4
SPARK_WORKER_MEMORY: 2g
SPARK_WORKER_PORT: 8881
SPARK_WORKER_WEBUI_PORT: 8081
SPARK_PUBLIC_DNS: <Öffentliche IP-Adresse der virtuellen Maschine>
depends_on:
- master
ports:
- 8081:8081
volumes:
- spark_data:/tmp/data
worker2:
image: semantive/spark:spark-2.1.1-hadoop-2.7.3
command: bin/spark-class org.apache.spark.deploy.worker.Worker spark://master:7077
hostname: worker2
environment:
SPARK_CONF_DIR: /conf
SPARK_WORKER_CORES: 4
SPARK_WORKER_MEMORY: 2g
SPARK_WORKER_PORT: 8882
SPARK_WORKER_WEBUI_PORT: 8082
SPARK_PUBLIC_DNS: <Öffentliche IP-Adresse der virtuellen Maschine>
depends_on:
- master
ports:
- 8082:8082
volumes:
- spark_data:/tmp/data
volumes:
spark_data:
driver: local
Starten Sie Spark Standalone Cluster.
$ docker-compose up -d
Öffnen Sie die Spark Master-Benutzeroberfläche und überprüfen Sie den Status des Clusters.
http://<Öffentliche IP-Adresse der virtuellen Maschine>:8080
Führen Sie die Spark-Shell auf dem Master-Container aus, um die Versionen Scala und Spark anzuzeigen. Die Entwicklung von Spark ist sehr schnell und es treten unerwartete Fehler auf, wenn Sie nicht sorgfältig prüfen, einschließlich der Scala-Version.
$ docker-compose exec master spark-shell
...
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.1.1
/_/
Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_131)
Type in expressions to have them evaluated.
Type :help for more information.
scala>
Jupyter
Das offizielle jupyter / all-spark-notebook wird für das Jupyter Docker-Image verwendet. Es ist ein Bild, das bis zu Scala und Spark verwendet werden kann.
Apache Toree
Apache Toree ist ein Tool zum Herstellen einer Verbindung zu einem Spark-Cluster von Jupyter. Zusätzlich zu PySpark werden Scala-, SparkR- und SQL-Kernel bereitgestellt.
Wenn Sie sich die [Docker-Datei] ansehen (https://github.com/jupyter/docker-stacks/blob/master/all-spark-notebook/Dockerfile), wird auch Apache Toree installiert.
# Apache Toree kernel
RUN pip --no-cache-dir install https://dist.apache.org/repos/dist/dev/incubator/toree/0.2.0/snapshots/dev1/toree-pip/toree-0.2.0.dev1.tar.gz
RUN jupyter toree install --sys-prefix
docker-compose.yml
Fügen Sie den Jupyter-Dienst zu docker-compose.yml für Spark Standalone Cluster hinzu.
docker-compose.yml
jupyter:
image: jupyter/all-spark-notebook:c1b0cf6bf4d6
depends_on:
- master
ports:
- 8888:8888
volumes:
- ./notebooks:/home/jovyan/work
- ./ivy2:/home/jovyan/.ivy2
env_file:
- ./.env
environment:
TINI_SUBREAPER: 'true'
SPARK_OPTS: --master spark://master:7077 --deploy-mode client --packages com.amazonaws:aws-java-sdk:1.7.4,org.apache.hadoop:hadoop-aws:2.7.3
command: start-notebook.sh --NotebookApp.password=sha1:xxx --NotebookApp.iopub_data_rate_limit=10000000
Da Spark Standalone Cluster Hadoop nicht verwendet, haben wir dem verteilten Dateisystem eine Einstellung zur Verwendung von Amazon S3 hinzugefügt. Es ist praktisch, es im Speicherziel von Beispieldaten und Parkettdateien zu haben.
image
Das Bild "Jupiter / All-Spark-Notebook" wird häufig aktualisiert. Die von Apache Toree verwendeten Spark- und Spark-Clusterversionen schlagen fehl und werden nicht gestartet. Diesmal ist die Spark-Cluster-Version "2.1.1". Geben Sie daher das Tag für dieselbe Version des Bildes an. Es ist unpraktisch, nur die ID des Tags des Bildes "Jupyter / All-Spark-Notebook" zu kennen.
Da die Version von Spark bereits auf [2.2.0] aktualisiert wurde (https://github.com/jupyter/docker-stacks/commit/c740fbb1ca63db5856e004d29dd08d11fb4f91f8), geben Sie das Tag "2.1.1" an. Ziehen Sie das Docker-Image des Tags und überprüfen Sie es mit "Spark-Shell".
$ docker pull jupyter/all-spark-notebook:c1b0cf6bf4d6
$ docker run -it --rm \
jupyter/all-spark-notebook:c1b0cf6bf4d6 \
/usr/local/spark-2.1.1-bin-hadoop2.7/bin/spark-shell
Wir haben bestätigt, dass die Versionen von Spark Cluster und Spark und Scala identisch sind.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.1.1
/_/
Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_131)
Type in expressions to have them evaluated.
Type :help for more information.
scala>
Überprüfen Sie auch die Jupyter-Version.
$ docker run -it --rm jupyter/all-spark-notebook:c1b0cf6bf4d6 jupyter --version
4.3.0
Dies sind die beiden Einstellungen, die erforderlich sind, um mit Apache Toree eine Verbindung zu einem Remote-Spark von Jupyter herzustellen. Die Umgebungsvariable TINI_SUBREAPER
verwendet Tini für init.
Wenn Spark keine zusätzlichen Jar-Dateien verwendet, können Sie eine Verbindung zu einem Remote-Spark-Standalone-Cluster herstellen, indem Sie in der Umgebungsvariablen "SPARK_OPTS" Folgendes angeben. Entspricht den normalen Spark-Submit-Optionen.
--master spark://master:7077 --deploy-mode client
Fügen Sie das Flag "--packages" weiter hinzu, wenn Sie zusätzliche Jar-Dateien haben. In diesem Fall ist es das Paket, das für die Verbindung mit Amazon S3 erforderlich ist.
--packages com.amazonaws:aws-java-sdk:1.7.4,org.apache.hadoop:hadoop-aws:2.7.3
--NotebookApp.iopub_data_rate_limit
Geben Sie beim Umgang mit großen Bildern mit Visualisierungstools wie Bokeh die Option des Startskripts von Jupyter an.
--NotebookApp.password
Die Standard-Jupyter-Authentifizierungsmethode ist Token. Ich habe zur Kennwortauthentifizierung gewechselt, da es schwierig ist, jedes Mal ein anderes Token einzufügen, wenn ich häufig wie ein Docker-Container starte und zerstöre. Verwenden Sie ipython, um den Hashwert des Passworts abzurufen.
$ docker run -it --rm jupyter/all-spark-notebook:c1b0cf6bf4d6 ipython
Python 3.6.1 | packaged by conda-forge | (default, May 23 2017, 14:16:20)
Type 'copyright', 'credits' or 'license' for more information
IPython 6.1.0 -- An enhanced Interactive Python. Type '?' for help.
Das Passwort wird wie folgt generiert. Geben Sie den Ausgabe-Hash-Wert in der Jupyter-Startoption an.
In [1]: from notebook.auth import passwd
In [2]: passwd()
Enter password:
Verify password:
Out[2]: 'sha1:xxx'
volumes
/ home / jovyan
ist das Home-Verzeichnis des Benutzers, der den Jupyter-Container ausführt. Hängen Sie das erstellte Notizbuch oder die heruntergeladene Jar-Datei auf den Docker-Host ein.
env_file
Schreiben Sie Umgebungsvariablen in die Datei ".env" und übergeben Sie sie an den Container. Geben Sie den Zugriffsschlüssel und den geheimen Schlüssel an, mit denen eine Verbindung zu Amazon S3 hergestellt werden soll.
AWS_ACCESS_KEY_ID=xxx
AWS_SECRET_ACCESS_KEY=xxx
Vergessen Sie nicht, es zu .gitignore hinzuzufügen, damit Sie sich nicht auf Git festlegen.
.env
Ich werde ein Beispiel in Scala und Python schreiben, das Spark und Amazon S3 mit Jupyter verwendet. In dem Artikel Überwachen von Echtzeit-Uber-Daten mithilfe von Apache-APIs, Teil 1: Spark Machine Learning Verwenden Sie die Uber-Aufnahmedaten, die Sie als Beispiel verwenden. Lesen Sie hier einfach die CSV-Datei aus S3 und zeigen Sie sie an.
Startet alle in docker-compose.yml definierten Dienste.
$ docker-compose up -d
Öffnen Sie Jupyter in Ihrem Browser und melden Sie sich mit dem zuvor erstellten Passwort an.
http://<Öffentliche IP-Adresse der virtuellen Maschine>:8888
Legen Sie nach dem Klonen des Repositorys die Datei "uber.csv" von "s3cmd" in einen geeigneten Bucket.
$ git clone https://github.com/caroljmcdonald/spark-ml-kmeans-uber
$ cd spark-ml-kmeans-uber/data
$ s3cmd put uber.csv s3://<Eimername>/uber-csv/
Scala
Sie können den Code in Zellen aufteilen und diese interaktiv ausführen, wo Sie den folgenden Code sehen möchten. Um ein Scala-Notizbuch zu schreiben, wählen Sie "Apache Toree - Scala" über die Schaltfläche "Neu" oben rechts.
import org.apache.spark.sql.SparkSession
val spark = SparkSession.
builder.
getOrCreate()
sc.hadoopConfiguration.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
sc.hadoopConfiguration.set("fs.s3a.fast.upload", "true")
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
val schema = StructType(
StructField("dt", TimestampType, true) ::
StructField("lat", DoubleType, true) ::
StructField("lon", DoubleType, true) ::
StructField("base", StringType, true) :: Nil
)
val df =
spark.read.
option("header", false).
schema(schema).
csv("s3a://<Eimername>/uber-csv/uber.csv")
df.printSchema
df.cache
df.show(false)
Für Scala Der StructType des Schemas kann auch wie folgt geschrieben werden:
val schema = (new StructType).
add("dt", "timestamp", true).
add("lat", "double", true).
add("lon", "double", true).
add("base", "string", true)
Dies ist die endgültige Ausgabe von "df.show (false)".
+---------------------+-------+--------+------+
|dt |lat |lon |base |
+---------------------+-------+--------+------+
|2014-08-01 00:00:00.0|40.729 |-73.9422|B02598|
|2014-08-01 00:00:00.0|40.7476|-73.9871|B02598|
|2014-08-01 00:00:00.0|40.7424|-74.0044|B02598|
|2014-08-01 00:00:00.0|40.751 |-73.9869|B02598|
|2014-08-01 00:00:00.0|40.7406|-73.9902|B02598|
|2014-08-01 00:00:00.0|40.6994|-73.9591|B02617|
|2014-08-01 00:00:00.0|40.6917|-73.9398|B02617|
|2014-08-01 00:00:00.0|40.7063|-73.9223|B02617|
|2014-08-01 00:00:00.0|40.6759|-74.0168|B02617|
|2014-08-01 00:00:00.0|40.7617|-73.9847|B02617|
|2014-08-01 00:00:00.0|40.6969|-73.9064|B02617|
|2014-08-01 00:00:00.0|40.7623|-73.9751|B02617|
|2014-08-01 00:00:00.0|40.6982|-73.9669|B02617|
|2014-08-01 00:00:00.0|40.7553|-73.9253|B02617|
|2014-08-01 00:00:00.0|40.7325|-73.9876|B02682|
|2014-08-01 00:00:00.0|40.6754|-74.017 |B02682|
|2014-08-01 00:00:00.0|40.7303|-74.0029|B02682|
|2014-08-01 00:00:00.0|40.7218|-73.9973|B02682|
|2014-08-01 00:00:00.0|40.7134|-74.0091|B02682|
|2014-08-01 00:00:00.0|40.7194|-73.9964|B02682|
+---------------------+-------+--------+------+
only showing top 20 rows
Python
Wenn Sie ein Python 3-Notizbuch schreiben, wählen Sie "Python 3" über die Schaltfläche "Neu" oben rechts. Teilen Sie den folgenden Code an geeigneten Stellen in Zellen und führen Sie ihn aus. Der Unterschied zu Scala besteht darin, dass das zusätzliche Jar in der Umgebungsvariablen "PYSPARK_SUBMIT_ARGS" angegeben ist.
Sie können Spark-Apps in Python ähnlich wie in Scala schreiben, wie unten gezeigt.
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.amazonaws:aws-java-sdk:1.7.4,org.apache.hadoop:hadoop-aws:2.7.3 pyspark-shell'
from pyspark.sql import SparkSession
spark = (
SparkSession.builder
.getOrCreate()
)
sc = spark.sparkContext
sc._jsc.hadoopConfiguration().set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
sc._jsc.hadoopConfiguration().set("fs.s3a.fast.upload", "true")
from pyspark.sql.types import *
from pyspark.sql.functions import *
schema = StructType([
StructField("dt", TimestampType(), True),
StructField("lat", DoubleType(), True),
StructField("lon", DoubleType(), True),
StructField("base", StringType(), True)
])
df = (
spark.read
.option("header", False)
.schema(schema)
.csv("s3a://<Eimername>/uber-csv/uber.csv")
)
df.printSchema()
df.cache()
df.show(truncate=False)
Die endgültige Ausgabe von "df.show (truncate = False)" entspricht dem obigen Scala-Code.
+---------------------+-------+--------+------+
|dt |lat |lon |base |
+---------------------+-------+--------+------+
|2014-08-01 00:00:00.0|40.729 |-73.9422|B02598|
|2014-08-01 00:00:00.0|40.7476|-73.9871|B02598|
|2014-08-01 00:00:00.0|40.7424|-74.0044|B02598|
|2014-08-01 00:00:00.0|40.751 |-73.9869|B02598|
|2014-08-01 00:00:00.0|40.7406|-73.9902|B02598|
|2014-08-01 00:00:00.0|40.6994|-73.9591|B02617|
|2014-08-01 00:00:00.0|40.6917|-73.9398|B02617|
|2014-08-01 00:00:00.0|40.7063|-73.9223|B02617|
|2014-08-01 00:00:00.0|40.6759|-74.0168|B02617|
|2014-08-01 00:00:00.0|40.7617|-73.9847|B02617|
|2014-08-01 00:00:00.0|40.6969|-73.9064|B02617|
|2014-08-01 00:00:00.0|40.7623|-73.9751|B02617|
|2014-08-01 00:00:00.0|40.6982|-73.9669|B02617|
|2014-08-01 00:00:00.0|40.7553|-73.9253|B02617|
|2014-08-01 00:00:00.0|40.7325|-73.9876|B02682|
|2014-08-01 00:00:00.0|40.6754|-74.017 |B02682|
|2014-08-01 00:00:00.0|40.7303|-74.0029|B02682|
|2014-08-01 00:00:00.0|40.7218|-73.9973|B02682|
|2014-08-01 00:00:00.0|40.7134|-74.0091|B02682|
|2014-08-01 00:00:00.0|40.7194|-73.9964|B02682|
+---------------------+-------+--------+------+
only showing top 20 rows
Recommended Posts