Streaming Python et SensorTag, Kafka, Spark Streaming - Partie 5: Connexion de Jupyter à Spark avec Apache Toree

Je vais préparer un cluster Spark et écrire un exemple de code. Je pense que de nombreuses personnes utilisent Jupyter comme environnement d'exécution pour l'analyse de données Python et l'apprentissage automatique. Le but de l'application Spark dans Apache Toree est également d'écrire de manière interactive à partir de Jupyter. Vous pouvez également utiliser Jupyter avec le REPL de Scala qui peut être exécuté à partir de votre navigateur.

Spark

Créez un cluster Spark avec Docker Compose. De nombreuses images Spark Standalone Cluster et docker-compose.yml sont disponibles sur Docker Hub et Git Hub.

J'en ai essayé, mais semantive / spark est simple et facile à utiliser.

Docker Compose

L'utilisation de l'image semantive / spark est décrite dans Docker Images For Apache Spark. Docker Hub sera ici et Git Hub sera ici.

J'ai apporté quelques modifications à partir de docker-compose.yml dans le référentiel. Le principal changement est de spécifier l'adresse IP publique de la machine virtuelle sur le cloud dans les variables d'environnement SPARK_PUBLIC_DNS et SPARK_MASTER_HOST, qui spécifient explicitement la balise d'image pour correspondre à la version Spark.

docker-compose.yml


version: '2'
services:
  master:
    image: semantive/spark:spark-2.1.1-hadoop-2.7.3
    command: bin/spark-class org.apache.spark.deploy.master.Master -h master
    hostname: master
    environment:
      MASTER: spark://master:7077
      SPARK_CONF_DIR: /conf
      SPARK_PUBLIC_DNS: <Adresse IP publique de la machine virtuelle>
      SPARK_MASTER_HOST: <Adresse IP publique de la machine virtuelle>
    ports:
      - 4040:4040
      - 6066:6066
      - 7077:7077
      - 8080:8080
    volumes:
      - spark_data:/tmp/data

  worker1:
    image: semantive/spark:spark-2.1.1-hadoop-2.7.3
    command: bin/spark-class org.apache.spark.deploy.worker.Worker spark://master:7077
    hostname: worker1
    environment:
      SPARK_CONF_DIR: /conf
      SPARK_WORKER_CORES: 4
      SPARK_WORKER_MEMORY: 2g
      SPARK_WORKER_PORT: 8881
      SPARK_WORKER_WEBUI_PORT: 8081
      SPARK_PUBLIC_DNS: <Adresse IP publique de la machine virtuelle>
    depends_on:
      - master
    ports:
      - 8081:8081
    volumes:
      - spark_data:/tmp/data

  worker2:
    image: semantive/spark:spark-2.1.1-hadoop-2.7.3
    command: bin/spark-class org.apache.spark.deploy.worker.Worker spark://master:7077
    hostname: worker2
    environment:
      SPARK_CONF_DIR: /conf
      SPARK_WORKER_CORES: 4
      SPARK_WORKER_MEMORY: 2g
      SPARK_WORKER_PORT: 8882
      SPARK_WORKER_WEBUI_PORT: 8082
      SPARK_PUBLIC_DNS: <Adresse IP publique de la machine virtuelle>
    depends_on:
      - master
    ports:
      - 8082:8082
    volumes:
      - spark_data:/tmp/data

volumes:
  spark_data:
    driver: local

Démarrez Spark Standalone Cluster.

$ docker-compose up -d

Ouvrez l'interface utilisateur Spark Master et vérifiez l'état du cluster.

http://<Adresse IP publique de la machine virtuelle>:8080

Exécutez spark-shell sur le conteneur maître pour voir les versions Scala et Spark. Spark est très rapide à développer et vous rencontrerez des erreurs inattendues si vous ne vérifiez pas attentivement, y compris la version Scala.

$ docker-compose exec master spark-shell
...
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.1.1
      /_/

Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_131)
Type in expressions to have them evaluated.
Type :help for more information.

scala>

Jupyter

Le [jupyter / all-spark-notebook] officiel (https://hub.docker.com/r/jupyter/all-spark-notebook/) est utilisé pour l'image Jupyter Docker. C'est une image qui peut être utilisée jusqu'à Scala et Spark.

Apache Toree

Apache Toree est un outil de connexion à un cluster Spark depuis Jupyter. En plus de PySpark, des noyaux Scala, SparkR et SQL sont fournis.

Si vous regardez le Dockerfile, Apache Toree est également installé.

# Apache Toree kernel
RUN pip --no-cache-dir install https://dist.apache.org/repos/dist/dev/incubator/toree/0.2.0/snapshots/dev1/toree-pip/toree-0.2.0.dev1.tar.gz
RUN jupyter toree install --sys-prefix

docker-compose.yml

Ajoutez le service Jupyter à docker-compose.yml pour Spark Standalone Cluster.

docker-compose.yml


  jupyter:
    image: jupyter/all-spark-notebook:c1b0cf6bf4d6
    depends_on:
      - master
    ports:
      - 8888:8888
    volumes:
      - ./notebooks:/home/jovyan/work
      - ./ivy2:/home/jovyan/.ivy2
    env_file:
      - ./.env
    environment:
      TINI_SUBREAPER: 'true'
      SPARK_OPTS: --master spark://master:7077 --deploy-mode client --packages com.amazonaws:aws-java-sdk:1.7.4,org.apache.hadoop:hadoop-aws:2.7.3
    command: start-notebook.sh --NotebookApp.password=sha1:xxx --NotebookApp.iopub_data_rate_limit=10000000

À propos des options de service Jupyter

Étant donné que Spark Standalone Cluster n'utilise pas Hadoop, nous avons ajouté un paramètre pour utiliser Amazon S3 dans le système de fichiers distribué. Il est pratique de l'avoir dans la destination de sauvegarde des exemples de données et des fichiers Parquet.

image

L'image jupyter / all-spark-notebook est fréquemment mise à jour. Les versions de cluster Spark et Spark utilisées par Apache Toree échoueront et ne démarreront pas. Cette fois, la version du cluster Spark est 2.1.1, spécifiez donc la balise pour la même version de l'image. jupyter / all-spark-notebook Il n'est pas pratique de connaître uniquement l'ID de la balise d'image.

Puisque la version de Spark a déjà été mise à niveau vers 2.2.0, spécifiez la balise «2.1.1».   Tirez l'image Docker de la balise et vérifiez-la avec spark-shell.

$ docker pull jupyter/all-spark-notebook:c1b0cf6bf4d6
$ docker run -it --rm \
  jupyter/all-spark-notebook:c1b0cf6bf4d6 \
  /usr/local/spark-2.1.1-bin-hadoop2.7/bin/spark-shell

Nous avons confirmé que les versions de Spark Cluster et Spark et Scala sont les mêmes.

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.1.1
      /_/

Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_131)
Type in expressions to have them evaluated.
Type :help for more information.

scala>

Vérifiez également la version de Jupyter.

$ docker run -it --rm jupyter/all-spark-notebook:c1b0cf6bf4d6 jupyter --version
4.3.0

TINI_SUBREAPER et SPARK_OPTS

Ce sont les deux paramètres requis pour se connecter à un Spark distant depuis Jupyter à l'aide d'Apache Toree. La variable d'environnement TINI_SUBREAPER utilise Tini pour init.

Si Spark n'utilise pas de fichiers Jar supplémentaires, vous pouvez vous connecter à un cluster Spark Standalone distant simplement en spécifiant ce qui suit dans la variable d'environnement SPARK_OPTS. Identique aux options normales de soumission d'étincelles.

--master spark://master:7077 --deploy-mode client

Ajoutez le drapeau --packages plus loin si vous avez des fichiers Jar supplémentaires. Dans ce cas, il s'agit du package requis pour se connecter à Amazon S3.

--packages com.amazonaws:aws-java-sdk:1.7.4,org.apache.hadoop:hadoop-aws:2.7.3

--NotebookApp.iopub_data_rate_limit

Lorsque vous traitez de grandes images avec des outils de visualisation tels que Bokeh, spécifiez l'option du script de démarrage de Jupyter.

--NotebookApp.password

La méthode d'authentification par défaut de Jupyter est le jeton. J'ai changé pour l'authentification par mot de passe car il est gênant d'insérer un jeton différent à chaque fois lors du démarrage et de la destruction fréquemment, comme un conteneur Docker. Utilisez ipython pour obtenir la valeur de hachage du mot de passe.

$ docker run -it --rm jupyter/all-spark-notebook:c1b0cf6bf4d6 ipython
Python 3.6.1 | packaged by conda-forge | (default, May 23 2017, 14:16:20)
Type 'copyright', 'credits' or 'license' for more information
IPython 6.1.0 -- An enhanced Interactive Python. Type '?' for help.

Le mot de passe est généré comme suit. Spécifiez la valeur de hachage de sortie dans l'option de démarrage Jupyter.

In [1]: from notebook.auth import passwd
In [2]: passwd()

Enter password:
Verify password:
Out[2]: 'sha1:xxx'

volumes

/ home / jovyan est le répertoire personnel de l'utilisateur exécutant le conteneur Jupyter. Montez le bloc-notes créé ou le fichier Jar téléchargé sur l'hôte Docker.

env_file

Écrivez les variables d'environnement dans le fichier .env et transmettez-les au conteneur. Spécifiez la clé d'accès et la clé secrète à utiliser pour se connecter à Amazon S3.

AWS_ACCESS_KEY_ID=xxx
AWS_SECRET_ACCESS_KEY=xxx

N'oubliez pas de l'ajouter à .gitignore pour ne pas vous engager dans Git.

.env

Utilisez Spark et Amazon S3 depuis Jupyter

Je vais écrire un exemple en Scala et Python qui utilise Spark et Amazon S3 avec Jupyter. Dans l'article Surveillance des données Uber en temps réel à l'aide des API Apache, partie 1: Spark Machine Learning Utilisez les données de collecte Uber que vous utilisez comme exemple. Ici, lisez simplement le fichier CSV depuis S3 et affichez-le.

Démarre tous les services définis dans docker-compose.yml.

$ docker-compose up -d

Ouvrez Jupyter dans votre navigateur et connectez-vous avec le mot de passe que vous avez créé précédemment.

http://<Adresse IP publique de la machine virtuelle>:8888

Préparation des données

Après avoir cloné le référentiel, placez le fichier ʻuber.csvdes3cmd` dans un compartiment approprié.

$ git clone https://github.com/caroljmcdonald/spark-ml-kmeans-uber
$ cd spark-ml-kmeans-uber/data
$ s3cmd put uber.csv s3://<Nom du godet>/uber-csv/

Scala

Vous pouvez diviser le code en cellules et les exécuter de manière interactive là où vous souhaitez voir le code ci-dessous. Pour écrire un cahier Scala, sélectionnez ʻApache Toree --Scala depuis le bouton Nouveau` en haut à droite.

import org.apache.spark.sql.SparkSession

val spark = SparkSession.
    builder.
    getOrCreate()

sc.hadoopConfiguration.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
sc.hadoopConfiguration.set("fs.s3a.fast.upload", "true")

import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._

val schema = StructType(
    StructField("dt", TimestampType, true) ::
    StructField("lat", DoubleType, true) ::
    StructField("lon", DoubleType, true) ::
    StructField("base", StringType, true) :: Nil
)

val df = 
    spark.read.
    option("header", false).
    schema(schema).
    csv("s3a://<Nom du godet>/uber-csv/uber.csv")

df.printSchema

df.cache
df.show(false)

Pour Scala Le StructType du schéma peut également être écrit comme suit:

val schema = (new StructType).
    add("dt", "timestamp", true).
    add("lat", "double", true).
    add("lon", "double", true).
    add("base", "string", true)

Ceci est la sortie finale de df.show (false).

+---------------------+-------+--------+------+
|dt                   |lat    |lon     |base  |
+---------------------+-------+--------+------+
|2014-08-01 00:00:00.0|40.729 |-73.9422|B02598|
|2014-08-01 00:00:00.0|40.7476|-73.9871|B02598|
|2014-08-01 00:00:00.0|40.7424|-74.0044|B02598|
|2014-08-01 00:00:00.0|40.751 |-73.9869|B02598|
|2014-08-01 00:00:00.0|40.7406|-73.9902|B02598|
|2014-08-01 00:00:00.0|40.6994|-73.9591|B02617|
|2014-08-01 00:00:00.0|40.6917|-73.9398|B02617|
|2014-08-01 00:00:00.0|40.7063|-73.9223|B02617|
|2014-08-01 00:00:00.0|40.6759|-74.0168|B02617|
|2014-08-01 00:00:00.0|40.7617|-73.9847|B02617|
|2014-08-01 00:00:00.0|40.6969|-73.9064|B02617|
|2014-08-01 00:00:00.0|40.7623|-73.9751|B02617|
|2014-08-01 00:00:00.0|40.6982|-73.9669|B02617|
|2014-08-01 00:00:00.0|40.7553|-73.9253|B02617|
|2014-08-01 00:00:00.0|40.7325|-73.9876|B02682|
|2014-08-01 00:00:00.0|40.6754|-74.017 |B02682|
|2014-08-01 00:00:00.0|40.7303|-74.0029|B02682|
|2014-08-01 00:00:00.0|40.7218|-73.9973|B02682|
|2014-08-01 00:00:00.0|40.7134|-74.0091|B02682|
|2014-08-01 00:00:00.0|40.7194|-73.9964|B02682|
+---------------------+-------+--------+------+
only showing top 20 rows

Python

Lors de l'écriture d'un bloc-notes Python 3, sélectionnez «Python 3» dans le bouton «Nouveau» en haut à droite. Divisez le code suivant en cellules aux points appropriés et exécutez-le. La différence avec Scala est que le Jar supplémentaire est spécifié dans la variable d'environnement PYSPARK_SUBMIT_ARGS.

Vous pouvez écrire des applications Spark en Python de la même manière que Scala, comme indiqué ci-dessous.

import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.amazonaws:aws-java-sdk:1.7.4,org.apache.hadoop:hadoop-aws:2.7.3 pyspark-shell'

from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
        .getOrCreate()
)

sc = spark.sparkContext

sc._jsc.hadoopConfiguration().set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
sc._jsc.hadoopConfiguration().set("fs.s3a.fast.upload", "true")

from pyspark.sql.types import *
from pyspark.sql.functions import *

schema = StructType([
    StructField("dt", TimestampType(), True),
    StructField("lat", DoubleType(), True),
    StructField("lon", DoubleType(), True),
    StructField("base", StringType(), True)
])

df = (
    spark.read
    .option("header", False)
    .schema(schema)
    .csv("s3a://<Nom du godet>/uber-csv/uber.csv")
)

df.printSchema()

df.cache()
df.show(truncate=False)

La sortie finale de df.show (truncate = False) est la même que le code Scala ci-dessus.

+---------------------+-------+--------+------+
|dt                   |lat    |lon     |base  |
+---------------------+-------+--------+------+
|2014-08-01 00:00:00.0|40.729 |-73.9422|B02598|
|2014-08-01 00:00:00.0|40.7476|-73.9871|B02598|
|2014-08-01 00:00:00.0|40.7424|-74.0044|B02598|
|2014-08-01 00:00:00.0|40.751 |-73.9869|B02598|
|2014-08-01 00:00:00.0|40.7406|-73.9902|B02598|
|2014-08-01 00:00:00.0|40.6994|-73.9591|B02617|
|2014-08-01 00:00:00.0|40.6917|-73.9398|B02617|
|2014-08-01 00:00:00.0|40.7063|-73.9223|B02617|
|2014-08-01 00:00:00.0|40.6759|-74.0168|B02617|
|2014-08-01 00:00:00.0|40.7617|-73.9847|B02617|
|2014-08-01 00:00:00.0|40.6969|-73.9064|B02617|
|2014-08-01 00:00:00.0|40.7623|-73.9751|B02617|
|2014-08-01 00:00:00.0|40.6982|-73.9669|B02617|
|2014-08-01 00:00:00.0|40.7553|-73.9253|B02617|
|2014-08-01 00:00:00.0|40.7325|-73.9876|B02682|
|2014-08-01 00:00:00.0|40.6754|-74.017 |B02682|
|2014-08-01 00:00:00.0|40.7303|-74.0029|B02682|
|2014-08-01 00:00:00.0|40.7218|-73.9973|B02682|
|2014-08-01 00:00:00.0|40.7134|-74.0091|B02682|
|2014-08-01 00:00:00.0|40.7194|-73.9964|B02682|
+---------------------+-------+--------+------+
only showing top 20 rows

Recommended Posts

Streaming Python et SensorTag, Kafka, Spark Streaming - Partie 5: Connexion de Jupyter à Spark avec Apache Toree
Traitement de flux de Python et SensorTag, Kafka, Spark Streaming - Partie 6: Agrégation de fenêtres de PySpark Streaming à partir de Jupyter
Traitement de flux de Python et SensorTag, Kafka, Spark Streaming - Partie 1: Raspberry Pi 3
Nombre de mots avec Apache Spark et python (Mac OS X)
Faites fonctionner Jupyter avec l'API REST pour extraire et enregistrer le code Python
Comment exécuter Jupyter et Spark sur Mac avec des paramètres minimaux
Comment créer un environnement d'exécution Python et Jupyter avec VSCode
[Python] Essayez de reconnaître les caractères des images avec OpenCV et pyocr
De Python à l'utilisation de MeCab (et CaboCha)
Fractal pour faire et jouer avec Python
Connexion de python à MySQL sur CentOS 6.4
CentOS 6.4, Python 2.7.3, Apache, mod_wsgi, Django
Portage et modification du solveur de doublets de python2 vers python3.
Lier Python et JavaScript avec le notebook Jupyter
WEB grattage avec python et essayez de créer un nuage de mots à partir des critiques
Grattage de la nourriture avec python et sortie en CSV
MessagePack-Try pour lier Java et Python avec RPC
Afficher de manière interactive des courbes algébriques en Python, Jupyter
[Python] Comment lire les données de CIFAR-10 et CIFAR-100
De la construction d'environnement Python à la construction d'environnement virtuel avec anaconda
Effectuez une recherche Twitter à partir de Python et essayez de générer des phrases avec la chaîne de Markov.
Extraire des images et des tableaux de pdf avec python pour réduire la charge de reporting
Procédure pour charger MNIST avec python et sortie en png
Comment récupérer des données d'image de Flickr avec Python
De Kafka à KSQL - Construction d'environnement facile avec docker
Essayez le fonctionnement de la base de données avec Python et visualisez avec d3
De l'achat d'un ordinateur à l'exécution d'un programme sur python
Recevoir des e-mails de Gmail et étiqueter avec Python3
Étudiez l'échange de données Java et Python avec Apache Arrow
Accès ODBC à SQL Server depuis Linux avec Python
Feuille de route d'apprentissage qui vous permet de développer et de publier des services à partir de zéro avec Python
J'ai créé un serveur avec socket Python et ssl et j'ai essayé d'y accéder depuis le navigateur
[Python / Ruby] Comprendre le code Comment obtenir des données en ligne et les écrire au format CSV
Précautions lors de la saisie à partir de CSV avec Python et de la sortie vers json pour faire exe