Streaming von Python und SensorTag, Kafka, Spark Streaming-Teil 5: Verbindung von Jupyter zu Spark mit Apache Toree

Ich werde einen Spark-Cluster vorbereiten und einen Beispielcode schreiben. Ich denke, viele Leute verwenden Jupyter als Ausführungsumgebung für die Python-Datenanalyse und das maschinelle Lernen. Der Zweck der Spark-App in Apache Toree besteht darin, auch interaktiv von Jupyter aus zu schreiben. Sie können Jupyter auch mit Scalas REPL verwenden, das über Ihren Browser ausgeführt werden kann.

Spark

Erstellen Sie mit Docker Compose einen Spark-Cluster. Viele Spark Standalone Cluster-Images und docker-compose.yml sind auf Docker Hub und Git Hub verfügbar.

Ich habe einige ausprobiert, aber semantive / spark ist einfach und benutzerfreundlich.

Docker Compose

Die Verwendung des Semantive / Spark-Images wird unter Docker-Images für Apache Spark beschrieben. Docker Hub ist hier und Git Hub ist hier.

Ich habe einige Änderungen an docker-compose.yml im Repository vorgenommen. Die Hauptänderung besteht darin, die öffentliche IP-Adresse der virtuellen Maschine in der Cloud in den Umgebungsvariablen "SPARK_PUBLIC_DNS" und "SPARK_MASTER_HOST" anzugeben, die explizit das Image-Tag angeben, das der Spark-Version entspricht.

docker-compose.yml


version: '2'
services:
  master:
    image: semantive/spark:spark-2.1.1-hadoop-2.7.3
    command: bin/spark-class org.apache.spark.deploy.master.Master -h master
    hostname: master
    environment:
      MASTER: spark://master:7077
      SPARK_CONF_DIR: /conf
      SPARK_PUBLIC_DNS: <Öffentliche IP-Adresse der virtuellen Maschine>
      SPARK_MASTER_HOST: <Öffentliche IP-Adresse der virtuellen Maschine>
    ports:
      - 4040:4040
      - 6066:6066
      - 7077:7077
      - 8080:8080
    volumes:
      - spark_data:/tmp/data

  worker1:
    image: semantive/spark:spark-2.1.1-hadoop-2.7.3
    command: bin/spark-class org.apache.spark.deploy.worker.Worker spark://master:7077
    hostname: worker1
    environment:
      SPARK_CONF_DIR: /conf
      SPARK_WORKER_CORES: 4
      SPARK_WORKER_MEMORY: 2g
      SPARK_WORKER_PORT: 8881
      SPARK_WORKER_WEBUI_PORT: 8081
      SPARK_PUBLIC_DNS: <Öffentliche IP-Adresse der virtuellen Maschine>
    depends_on:
      - master
    ports:
      - 8081:8081
    volumes:
      - spark_data:/tmp/data

  worker2:
    image: semantive/spark:spark-2.1.1-hadoop-2.7.3
    command: bin/spark-class org.apache.spark.deploy.worker.Worker spark://master:7077
    hostname: worker2
    environment:
      SPARK_CONF_DIR: /conf
      SPARK_WORKER_CORES: 4
      SPARK_WORKER_MEMORY: 2g
      SPARK_WORKER_PORT: 8882
      SPARK_WORKER_WEBUI_PORT: 8082
      SPARK_PUBLIC_DNS: <Öffentliche IP-Adresse der virtuellen Maschine>
    depends_on:
      - master
    ports:
      - 8082:8082
    volumes:
      - spark_data:/tmp/data

volumes:
  spark_data:
    driver: local

Starten Sie Spark Standalone Cluster.

$ docker-compose up -d

Öffnen Sie die Spark Master-Benutzeroberfläche und überprüfen Sie den Status des Clusters.

http://<Öffentliche IP-Adresse der virtuellen Maschine>:8080

Führen Sie die Spark-Shell auf dem Master-Container aus, um die Versionen Scala und Spark anzuzeigen. Die Entwicklung von Spark ist sehr schnell und es treten unerwartete Fehler auf, wenn Sie nicht sorgfältig prüfen, einschließlich der Scala-Version.

$ docker-compose exec master spark-shell
...
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.1.1
      /_/

Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_131)
Type in expressions to have them evaluated.
Type :help for more information.

scala>

Jupyter

Das offizielle jupyter / all-spark-notebook wird für das Jupyter Docker-Image verwendet. Es ist ein Bild, das bis zu Scala und Spark verwendet werden kann.

Apache Toree

Apache Toree ist ein Tool zum Herstellen einer Verbindung zu einem Spark-Cluster von Jupyter. Zusätzlich zu PySpark werden Scala-, SparkR- und SQL-Kernel bereitgestellt.

Wenn Sie sich die [Docker-Datei] ansehen (https://github.com/jupyter/docker-stacks/blob/master/all-spark-notebook/Dockerfile), wird auch Apache Toree installiert.

# Apache Toree kernel
RUN pip --no-cache-dir install https://dist.apache.org/repos/dist/dev/incubator/toree/0.2.0/snapshots/dev1/toree-pip/toree-0.2.0.dev1.tar.gz
RUN jupyter toree install --sys-prefix

docker-compose.yml

Fügen Sie den Jupyter-Dienst zu docker-compose.yml für Spark Standalone Cluster hinzu.

docker-compose.yml


  jupyter:
    image: jupyter/all-spark-notebook:c1b0cf6bf4d6
    depends_on:
      - master
    ports:
      - 8888:8888
    volumes:
      - ./notebooks:/home/jovyan/work
      - ./ivy2:/home/jovyan/.ivy2
    env_file:
      - ./.env
    environment:
      TINI_SUBREAPER: 'true'
      SPARK_OPTS: --master spark://master:7077 --deploy-mode client --packages com.amazonaws:aws-java-sdk:1.7.4,org.apache.hadoop:hadoop-aws:2.7.3
    command: start-notebook.sh --NotebookApp.password=sha1:xxx --NotebookApp.iopub_data_rate_limit=10000000

Informationen zu Jupyter-Serviceoptionen

Da Spark Standalone Cluster Hadoop nicht verwendet, haben wir dem verteilten Dateisystem eine Einstellung zur Verwendung von Amazon S3 hinzugefügt. Es ist praktisch, es im Speicherziel von Beispieldaten und Parkettdateien zu haben.

image

Das Bild "Jupiter / All-Spark-Notebook" wird häufig aktualisiert. Die von Apache Toree verwendeten Spark- und Spark-Clusterversionen schlagen fehl und werden nicht gestartet. Diesmal ist die Spark-Cluster-Version "2.1.1". Geben Sie daher das Tag für dieselbe Version des Bildes an. Es ist unpraktisch, nur die ID des Tags des Bildes "Jupyter / All-Spark-Notebook" zu kennen.

Da die Version von Spark bereits auf [2.2.0] aktualisiert wurde (https://github.com/jupyter/docker-stacks/commit/c740fbb1ca63db5856e004d29dd08d11fb4f91f8), geben Sie das Tag "2.1.1" an.   Ziehen Sie das Docker-Image des Tags und überprüfen Sie es mit "Spark-Shell".

$ docker pull jupyter/all-spark-notebook:c1b0cf6bf4d6
$ docker run -it --rm \
  jupyter/all-spark-notebook:c1b0cf6bf4d6 \
  /usr/local/spark-2.1.1-bin-hadoop2.7/bin/spark-shell

Wir haben bestätigt, dass die Versionen von Spark Cluster und Spark und Scala identisch sind.

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.1.1
      /_/

Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_131)
Type in expressions to have them evaluated.
Type :help for more information.

scala>

Überprüfen Sie auch die Jupyter-Version.

$ docker run -it --rm jupyter/all-spark-notebook:c1b0cf6bf4d6 jupyter --version
4.3.0

TINI_SUBREAPER und SPARK_OPTS

Dies sind die beiden Einstellungen, die erforderlich sind, um mit Apache Toree eine Verbindung zu einem Remote-Spark von Jupyter herzustellen. Die Umgebungsvariable TINI_SUBREAPER verwendet Tini für init.

Wenn Spark keine zusätzlichen Jar-Dateien verwendet, können Sie eine Verbindung zu einem Remote-Spark-Standalone-Cluster herstellen, indem Sie in der Umgebungsvariablen "SPARK_OPTS" Folgendes angeben. Entspricht den normalen Spark-Submit-Optionen.

--master spark://master:7077 --deploy-mode client

Fügen Sie das Flag "--packages" weiter hinzu, wenn Sie zusätzliche Jar-Dateien haben. In diesem Fall ist es das Paket, das für die Verbindung mit Amazon S3 erforderlich ist.

--packages com.amazonaws:aws-java-sdk:1.7.4,org.apache.hadoop:hadoop-aws:2.7.3

--NotebookApp.iopub_data_rate_limit

Geben Sie beim Umgang mit großen Bildern mit Visualisierungstools wie Bokeh die Option des Startskripts von Jupyter an.

--NotebookApp.password

Die Standard-Jupyter-Authentifizierungsmethode ist Token. Ich habe zur Kennwortauthentifizierung gewechselt, da es schwierig ist, jedes Mal ein anderes Token einzufügen, wenn ich häufig wie ein Docker-Container starte und zerstöre. Verwenden Sie ipython, um den Hashwert des Passworts abzurufen.

$ docker run -it --rm jupyter/all-spark-notebook:c1b0cf6bf4d6 ipython
Python 3.6.1 | packaged by conda-forge | (default, May 23 2017, 14:16:20)
Type 'copyright', 'credits' or 'license' for more information
IPython 6.1.0 -- An enhanced Interactive Python. Type '?' for help.

Das Passwort wird wie folgt generiert. Geben Sie den Ausgabe-Hash-Wert in der Jupyter-Startoption an.

In [1]: from notebook.auth import passwd
In [2]: passwd()

Enter password:
Verify password:
Out[2]: 'sha1:xxx'

volumes

/ home / jovyan ist das Home-Verzeichnis des Benutzers, der den Jupyter-Container ausführt. Hängen Sie das erstellte Notizbuch oder die heruntergeladene Jar-Datei auf den Docker-Host ein.

env_file

Schreiben Sie Umgebungsvariablen in die Datei ".env" und übergeben Sie sie an den Container. Geben Sie den Zugriffsschlüssel und den geheimen Schlüssel an, mit denen eine Verbindung zu Amazon S3 hergestellt werden soll.

AWS_ACCESS_KEY_ID=xxx
AWS_SECRET_ACCESS_KEY=xxx

Vergessen Sie nicht, es zu .gitignore hinzuzufügen, damit Sie sich nicht auf Git festlegen.

.env

Verwenden Sie Spark und Amazon S3 von Jupyter

Ich werde ein Beispiel in Scala und Python schreiben, das Spark und Amazon S3 mit Jupyter verwendet. In dem Artikel Überwachen von Echtzeit-Uber-Daten mithilfe von Apache-APIs, Teil 1: Spark Machine Learning Verwenden Sie die Uber-Aufnahmedaten, die Sie als Beispiel verwenden. Lesen Sie hier einfach die CSV-Datei aus S3 und zeigen Sie sie an.

Startet alle in docker-compose.yml definierten Dienste.

$ docker-compose up -d

Öffnen Sie Jupyter in Ihrem Browser und melden Sie sich mit dem zuvor erstellten Passwort an.

http://<Öffentliche IP-Adresse der virtuellen Maschine>:8888

Datenaufbereitung

Legen Sie nach dem Klonen des Repositorys die Datei "uber.csv" von "s3cmd" in einen geeigneten Bucket.

$ git clone https://github.com/caroljmcdonald/spark-ml-kmeans-uber
$ cd spark-ml-kmeans-uber/data
$ s3cmd put uber.csv s3://<Eimername>/uber-csv/

Scala

Sie können den Code in Zellen aufteilen und diese interaktiv ausführen, wo Sie den folgenden Code sehen möchten. Um ein Scala-Notizbuch zu schreiben, wählen Sie "Apache Toree - Scala" über die Schaltfläche "Neu" oben rechts.

import org.apache.spark.sql.SparkSession

val spark = SparkSession.
    builder.
    getOrCreate()

sc.hadoopConfiguration.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
sc.hadoopConfiguration.set("fs.s3a.fast.upload", "true")

import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._

val schema = StructType(
    StructField("dt", TimestampType, true) ::
    StructField("lat", DoubleType, true) ::
    StructField("lon", DoubleType, true) ::
    StructField("base", StringType, true) :: Nil
)

val df = 
    spark.read.
    option("header", false).
    schema(schema).
    csv("s3a://<Eimername>/uber-csv/uber.csv")

df.printSchema

df.cache
df.show(false)

Für Scala Der StructType des Schemas kann auch wie folgt geschrieben werden:

val schema = (new StructType).
    add("dt", "timestamp", true).
    add("lat", "double", true).
    add("lon", "double", true).
    add("base", "string", true)

Dies ist die endgültige Ausgabe von "df.show (false)".

+---------------------+-------+--------+------+
|dt                   |lat    |lon     |base  |
+---------------------+-------+--------+------+
|2014-08-01 00:00:00.0|40.729 |-73.9422|B02598|
|2014-08-01 00:00:00.0|40.7476|-73.9871|B02598|
|2014-08-01 00:00:00.0|40.7424|-74.0044|B02598|
|2014-08-01 00:00:00.0|40.751 |-73.9869|B02598|
|2014-08-01 00:00:00.0|40.7406|-73.9902|B02598|
|2014-08-01 00:00:00.0|40.6994|-73.9591|B02617|
|2014-08-01 00:00:00.0|40.6917|-73.9398|B02617|
|2014-08-01 00:00:00.0|40.7063|-73.9223|B02617|
|2014-08-01 00:00:00.0|40.6759|-74.0168|B02617|
|2014-08-01 00:00:00.0|40.7617|-73.9847|B02617|
|2014-08-01 00:00:00.0|40.6969|-73.9064|B02617|
|2014-08-01 00:00:00.0|40.7623|-73.9751|B02617|
|2014-08-01 00:00:00.0|40.6982|-73.9669|B02617|
|2014-08-01 00:00:00.0|40.7553|-73.9253|B02617|
|2014-08-01 00:00:00.0|40.7325|-73.9876|B02682|
|2014-08-01 00:00:00.0|40.6754|-74.017 |B02682|
|2014-08-01 00:00:00.0|40.7303|-74.0029|B02682|
|2014-08-01 00:00:00.0|40.7218|-73.9973|B02682|
|2014-08-01 00:00:00.0|40.7134|-74.0091|B02682|
|2014-08-01 00:00:00.0|40.7194|-73.9964|B02682|
+---------------------+-------+--------+------+
only showing top 20 rows

Python

Wenn Sie ein Python 3-Notizbuch schreiben, wählen Sie "Python 3" über die Schaltfläche "Neu" oben rechts. Teilen Sie den folgenden Code an geeigneten Stellen in Zellen und führen Sie ihn aus. Der Unterschied zu Scala besteht darin, dass das zusätzliche Jar in der Umgebungsvariablen "PYSPARK_SUBMIT_ARGS" angegeben ist.

Sie können Spark-Apps in Python ähnlich wie in Scala schreiben, wie unten gezeigt.

import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.amazonaws:aws-java-sdk:1.7.4,org.apache.hadoop:hadoop-aws:2.7.3 pyspark-shell'

from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
        .getOrCreate()
)

sc = spark.sparkContext

sc._jsc.hadoopConfiguration().set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
sc._jsc.hadoopConfiguration().set("fs.s3a.fast.upload", "true")

from pyspark.sql.types import *
from pyspark.sql.functions import *

schema = StructType([
    StructField("dt", TimestampType(), True),
    StructField("lat", DoubleType(), True),
    StructField("lon", DoubleType(), True),
    StructField("base", StringType(), True)
])

df = (
    spark.read
    .option("header", False)
    .schema(schema)
    .csv("s3a://<Eimername>/uber-csv/uber.csv")
)

df.printSchema()

df.cache()
df.show(truncate=False)

Die endgültige Ausgabe von "df.show (truncate = False)" entspricht dem obigen Scala-Code.

+---------------------+-------+--------+------+
|dt                   |lat    |lon     |base  |
+---------------------+-------+--------+------+
|2014-08-01 00:00:00.0|40.729 |-73.9422|B02598|
|2014-08-01 00:00:00.0|40.7476|-73.9871|B02598|
|2014-08-01 00:00:00.0|40.7424|-74.0044|B02598|
|2014-08-01 00:00:00.0|40.751 |-73.9869|B02598|
|2014-08-01 00:00:00.0|40.7406|-73.9902|B02598|
|2014-08-01 00:00:00.0|40.6994|-73.9591|B02617|
|2014-08-01 00:00:00.0|40.6917|-73.9398|B02617|
|2014-08-01 00:00:00.0|40.7063|-73.9223|B02617|
|2014-08-01 00:00:00.0|40.6759|-74.0168|B02617|
|2014-08-01 00:00:00.0|40.7617|-73.9847|B02617|
|2014-08-01 00:00:00.0|40.6969|-73.9064|B02617|
|2014-08-01 00:00:00.0|40.7623|-73.9751|B02617|
|2014-08-01 00:00:00.0|40.6982|-73.9669|B02617|
|2014-08-01 00:00:00.0|40.7553|-73.9253|B02617|
|2014-08-01 00:00:00.0|40.7325|-73.9876|B02682|
|2014-08-01 00:00:00.0|40.6754|-74.017 |B02682|
|2014-08-01 00:00:00.0|40.7303|-74.0029|B02682|
|2014-08-01 00:00:00.0|40.7218|-73.9973|B02682|
|2014-08-01 00:00:00.0|40.7134|-74.0091|B02682|
|2014-08-01 00:00:00.0|40.7194|-73.9964|B02682|
+---------------------+-------+--------+------+
only showing top 20 rows

Recommended Posts

Streaming von Python und SensorTag, Kafka, Spark Streaming-Teil 5: Verbindung von Jupyter zu Spark mit Apache Toree
Stream-Verarbeitung von Python und SensorTag, Kafka, Spark Streaming - Teil 6: Fensteraggregation von PySpark Streaming von Jupyter
Stream-Verarbeitung von Python und SensorTag, Kafka, Spark Streaming - Teil 1: Raspberry Pi 3
Word Count mit Apache Spark und Python (Mac OS X)
Führen Sie Jupyter mit der REST-API aus, um Python-Code zu extrahieren und zu speichern
So führen Sie Jupyter und Spark auf einem Mac mit minimalen Einstellungen aus
So erstellen Sie eine Python- und Jupyter-Ausführungsumgebung mit VSCode
[Python] Versuchen Sie, Zeichen aus Bildern mit OpenCV und pyocr zu erkennen
Von Python bis zur Verwendung von MeCab (und CaboCha)
Fraktal zum Erstellen und Spielen mit Python
Herstellen einer Verbindung von Python zu MySQL unter CentOS 6.4
CentOS 6.4, Python 2.7.3, Apache, mod_wsgi, Django
Portieren und Ändern des Doublet-Solvers von Python2 auf Python3.
Verknüpfung von Python und JavaScript mit dem Jupiter-Notizbuch
WEB Scraping mit Python und versuchen, aus Bewertungen eine Wortwolke zu machen
Kratzen Sie das Essen mit Python und geben Sie es an CSV aus
MessagePack-Versuchen Sie, Java und Python mit RPC zu verbinden
Interaktive Anzeige algebraischer Kurven in Python, Jupyter
[Python] Lesen von Daten aus CIFAR-10 und CIFAR-100
Von der Python-Umgebungskonstruktion zur virtuellen Umgebungskonstruktion mit Anaconda
Führen Sie eine Twitter-Suche in Python durch und versuchen Sie, Sätze mit der Markov-Kette zu generieren.
Extrahieren Sie Bilder und Tabellen mit Python aus PDF, um die Berichtslast zu verringern
Prozedur zum Laden von MNIST mit Python und zur Ausgabe an png
So kratzen Sie Bilddaten von Flickr mit Python
Von Kafka bis KSQL - Einfache Umgebungskonstruktion mit Docker
Probieren Sie die DB-Operation mit Python aus und visualisieren Sie sie mit d3
Vom Kauf eines Computers bis zur Ausführung eines Programms auf Python
Erhalten Sie E-Mails von Google Mail und beschriften Sie sie mit Python3
Untersuchen Sie den Java- und Python-Datenaustausch mit Apache Arrow
ODBC-Zugriff auf SQL Server von Linux mit Python
Lern-Roadmap, mit der Sie Services mit Python von Grund auf neu entwickeln und veröffentlichen können
Ich habe einen Server mit Python-Socket und SSL erstellt und versucht, über den Browser darauf zuzugreifen
[Python / Ruby] Mit Code verstehen Wie man Daten aus dem Internet abruft und in CSV schreibt
Vorsichtsmaßnahmen bei der Eingabe von CSV mit Python und der Ausgabe an json, um exe zu erstellen