Je vais préparer un cluster Spark et écrire un exemple de code. Je pense que de nombreuses personnes utilisent Jupyter comme environnement d'exécution pour l'analyse de données Python et l'apprentissage automatique. Le but de l'application Spark dans Apache Toree est également d'écrire de manière interactive à partir de Jupyter. Vous pouvez également utiliser Jupyter avec le REPL de Scala qui peut être exécuté à partir de votre navigateur.
Spark
Créez un cluster Spark avec Docker Compose. De nombreuses images Spark Standalone Cluster et docker-compose.yml sont disponibles sur Docker Hub et Git Hub.
J'en ai essayé, mais semantive / spark est simple et facile à utiliser.
Docker Compose
L'utilisation de l'image semantive / spark
est décrite dans Docker Images For Apache Spark. Docker Hub sera ici et Git Hub sera ici.
J'ai apporté quelques modifications à partir de docker-compose.yml dans le référentiel. Le principal changement est de spécifier l'adresse IP publique de la machine virtuelle sur le cloud dans les variables d'environnement SPARK_PUBLIC_DNS
et SPARK_MASTER_HOST
, qui spécifient explicitement la balise d'image pour correspondre à la version Spark.
docker-compose.yml
version: '2'
services:
master:
image: semantive/spark:spark-2.1.1-hadoop-2.7.3
command: bin/spark-class org.apache.spark.deploy.master.Master -h master
hostname: master
environment:
MASTER: spark://master:7077
SPARK_CONF_DIR: /conf
SPARK_PUBLIC_DNS: <Adresse IP publique de la machine virtuelle>
SPARK_MASTER_HOST: <Adresse IP publique de la machine virtuelle>
ports:
- 4040:4040
- 6066:6066
- 7077:7077
- 8080:8080
volumes:
- spark_data:/tmp/data
worker1:
image: semantive/spark:spark-2.1.1-hadoop-2.7.3
command: bin/spark-class org.apache.spark.deploy.worker.Worker spark://master:7077
hostname: worker1
environment:
SPARK_CONF_DIR: /conf
SPARK_WORKER_CORES: 4
SPARK_WORKER_MEMORY: 2g
SPARK_WORKER_PORT: 8881
SPARK_WORKER_WEBUI_PORT: 8081
SPARK_PUBLIC_DNS: <Adresse IP publique de la machine virtuelle>
depends_on:
- master
ports:
- 8081:8081
volumes:
- spark_data:/tmp/data
worker2:
image: semantive/spark:spark-2.1.1-hadoop-2.7.3
command: bin/spark-class org.apache.spark.deploy.worker.Worker spark://master:7077
hostname: worker2
environment:
SPARK_CONF_DIR: /conf
SPARK_WORKER_CORES: 4
SPARK_WORKER_MEMORY: 2g
SPARK_WORKER_PORT: 8882
SPARK_WORKER_WEBUI_PORT: 8082
SPARK_PUBLIC_DNS: <Adresse IP publique de la machine virtuelle>
depends_on:
- master
ports:
- 8082:8082
volumes:
- spark_data:/tmp/data
volumes:
spark_data:
driver: local
Démarrez Spark Standalone Cluster.
$ docker-compose up -d
Ouvrez l'interface utilisateur Spark Master et vérifiez l'état du cluster.
http://<Adresse IP publique de la machine virtuelle>:8080
Exécutez spark-shell sur le conteneur maître pour voir les versions Scala et Spark. Spark est très rapide à développer et vous rencontrerez des erreurs inattendues si vous ne vérifiez pas attentivement, y compris la version Scala.
$ docker-compose exec master spark-shell
...
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.1.1
/_/
Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_131)
Type in expressions to have them evaluated.
Type :help for more information.
scala>
Jupyter
Le [jupyter / all-spark-notebook] officiel (https://hub.docker.com/r/jupyter/all-spark-notebook/) est utilisé pour l'image Jupyter Docker. C'est une image qui peut être utilisée jusqu'à Scala et Spark.
Apache Toree
Apache Toree est un outil de connexion à un cluster Spark depuis Jupyter. En plus de PySpark, des noyaux Scala, SparkR et SQL sont fournis.
Si vous regardez le Dockerfile, Apache Toree est également installé.
# Apache Toree kernel
RUN pip --no-cache-dir install https://dist.apache.org/repos/dist/dev/incubator/toree/0.2.0/snapshots/dev1/toree-pip/toree-0.2.0.dev1.tar.gz
RUN jupyter toree install --sys-prefix
docker-compose.yml
Ajoutez le service Jupyter à docker-compose.yml pour Spark Standalone Cluster.
docker-compose.yml
jupyter:
image: jupyter/all-spark-notebook:c1b0cf6bf4d6
depends_on:
- master
ports:
- 8888:8888
volumes:
- ./notebooks:/home/jovyan/work
- ./ivy2:/home/jovyan/.ivy2
env_file:
- ./.env
environment:
TINI_SUBREAPER: 'true'
SPARK_OPTS: --master spark://master:7077 --deploy-mode client --packages com.amazonaws:aws-java-sdk:1.7.4,org.apache.hadoop:hadoop-aws:2.7.3
command: start-notebook.sh --NotebookApp.password=sha1:xxx --NotebookApp.iopub_data_rate_limit=10000000
Étant donné que Spark Standalone Cluster n'utilise pas Hadoop, nous avons ajouté un paramètre pour utiliser Amazon S3 dans le système de fichiers distribué. Il est pratique de l'avoir dans la destination de sauvegarde des exemples de données et des fichiers Parquet.
image
L'image jupyter / all-spark-notebook
est fréquemment mise à jour. Les versions de cluster Spark et Spark utilisées par Apache Toree échoueront et ne démarreront pas. Cette fois, la version du cluster Spark est 2.1.1
, spécifiez donc la balise pour la même version de l'image. jupyter / all-spark-notebook
Il n'est pas pratique de connaître uniquement l'ID de la balise d'image.
Puisque la version de Spark a déjà été mise à niveau vers 2.2.0, spécifiez la balise «2.1.1».
Tirez l'image Docker de la balise et vérifiez-la avec spark-shell
.
$ docker pull jupyter/all-spark-notebook:c1b0cf6bf4d6
$ docker run -it --rm \
jupyter/all-spark-notebook:c1b0cf6bf4d6 \
/usr/local/spark-2.1.1-bin-hadoop2.7/bin/spark-shell
Nous avons confirmé que les versions de Spark Cluster et Spark et Scala sont les mêmes.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.1.1
/_/
Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_131)
Type in expressions to have them evaluated.
Type :help for more information.
scala>
Vérifiez également la version de Jupyter.
$ docker run -it --rm jupyter/all-spark-notebook:c1b0cf6bf4d6 jupyter --version
4.3.0
Ce sont les deux paramètres requis pour se connecter à un Spark distant depuis Jupyter à l'aide d'Apache Toree. La variable d'environnement TINI_SUBREAPER
utilise Tini pour init.
Si Spark n'utilise pas de fichiers Jar supplémentaires, vous pouvez vous connecter à un cluster Spark Standalone distant simplement en spécifiant ce qui suit dans la variable d'environnement SPARK_OPTS
. Identique aux options normales de soumission d'étincelles.
--master spark://master:7077 --deploy-mode client
Ajoutez le drapeau --packages
plus loin si vous avez des fichiers Jar supplémentaires. Dans ce cas, il s'agit du package requis pour se connecter à Amazon S3.
--packages com.amazonaws:aws-java-sdk:1.7.4,org.apache.hadoop:hadoop-aws:2.7.3
--NotebookApp.iopub_data_rate_limit
Lorsque vous traitez de grandes images avec des outils de visualisation tels que Bokeh, spécifiez l'option du script de démarrage de Jupyter.
--NotebookApp.password
La méthode d'authentification par défaut de Jupyter est le jeton. J'ai changé pour l'authentification par mot de passe car il est gênant d'insérer un jeton différent à chaque fois lors du démarrage et de la destruction fréquemment, comme un conteneur Docker. Utilisez ipython pour obtenir la valeur de hachage du mot de passe.
$ docker run -it --rm jupyter/all-spark-notebook:c1b0cf6bf4d6 ipython
Python 3.6.1 | packaged by conda-forge | (default, May 23 2017, 14:16:20)
Type 'copyright', 'credits' or 'license' for more information
IPython 6.1.0 -- An enhanced Interactive Python. Type '?' for help.
Le mot de passe est généré comme suit. Spécifiez la valeur de hachage de sortie dans l'option de démarrage Jupyter.
In [1]: from notebook.auth import passwd
In [2]: passwd()
Enter password:
Verify password:
Out[2]: 'sha1:xxx'
volumes
/ home / jovyan
est le répertoire personnel de l'utilisateur exécutant le conteneur Jupyter. Montez le bloc-notes créé ou le fichier Jar téléchargé sur l'hôte Docker.
env_file
Écrivez les variables d'environnement dans le fichier .env
et transmettez-les au conteneur. Spécifiez la clé d'accès et la clé secrète à utiliser pour se connecter à Amazon S3.
AWS_ACCESS_KEY_ID=xxx
AWS_SECRET_ACCESS_KEY=xxx
N'oubliez pas de l'ajouter à .gitignore pour ne pas vous engager dans Git.
.env
Je vais écrire un exemple en Scala et Python qui utilise Spark et Amazon S3 avec Jupyter. Dans l'article Surveillance des données Uber en temps réel à l'aide des API Apache, partie 1: Spark Machine Learning Utilisez les données de collecte Uber que vous utilisez comme exemple. Ici, lisez simplement le fichier CSV depuis S3 et affichez-le.
Démarre tous les services définis dans docker-compose.yml.
$ docker-compose up -d
Ouvrez Jupyter dans votre navigateur et connectez-vous avec le mot de passe que vous avez créé précédemment.
http://<Adresse IP publique de la machine virtuelle>:8888
Après avoir cloné le référentiel, placez le fichier ʻuber.csvde
s3cmd` dans un compartiment approprié.
$ git clone https://github.com/caroljmcdonald/spark-ml-kmeans-uber
$ cd spark-ml-kmeans-uber/data
$ s3cmd put uber.csv s3://<Nom du godet>/uber-csv/
Scala
Vous pouvez diviser le code en cellules et les exécuter de manière interactive là où vous souhaitez voir le code ci-dessous. Pour écrire un cahier Scala, sélectionnez ʻApache Toree --Scala depuis le bouton
Nouveau` en haut à droite.
import org.apache.spark.sql.SparkSession
val spark = SparkSession.
builder.
getOrCreate()
sc.hadoopConfiguration.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
sc.hadoopConfiguration.set("fs.s3a.fast.upload", "true")
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
val schema = StructType(
StructField("dt", TimestampType, true) ::
StructField("lat", DoubleType, true) ::
StructField("lon", DoubleType, true) ::
StructField("base", StringType, true) :: Nil
)
val df =
spark.read.
option("header", false).
schema(schema).
csv("s3a://<Nom du godet>/uber-csv/uber.csv")
df.printSchema
df.cache
df.show(false)
Pour Scala Le StructType du schéma peut également être écrit comme suit:
val schema = (new StructType).
add("dt", "timestamp", true).
add("lat", "double", true).
add("lon", "double", true).
add("base", "string", true)
Ceci est la sortie finale de df.show (false)
.
+---------------------+-------+--------+------+
|dt |lat |lon |base |
+---------------------+-------+--------+------+
|2014-08-01 00:00:00.0|40.729 |-73.9422|B02598|
|2014-08-01 00:00:00.0|40.7476|-73.9871|B02598|
|2014-08-01 00:00:00.0|40.7424|-74.0044|B02598|
|2014-08-01 00:00:00.0|40.751 |-73.9869|B02598|
|2014-08-01 00:00:00.0|40.7406|-73.9902|B02598|
|2014-08-01 00:00:00.0|40.6994|-73.9591|B02617|
|2014-08-01 00:00:00.0|40.6917|-73.9398|B02617|
|2014-08-01 00:00:00.0|40.7063|-73.9223|B02617|
|2014-08-01 00:00:00.0|40.6759|-74.0168|B02617|
|2014-08-01 00:00:00.0|40.7617|-73.9847|B02617|
|2014-08-01 00:00:00.0|40.6969|-73.9064|B02617|
|2014-08-01 00:00:00.0|40.7623|-73.9751|B02617|
|2014-08-01 00:00:00.0|40.6982|-73.9669|B02617|
|2014-08-01 00:00:00.0|40.7553|-73.9253|B02617|
|2014-08-01 00:00:00.0|40.7325|-73.9876|B02682|
|2014-08-01 00:00:00.0|40.6754|-74.017 |B02682|
|2014-08-01 00:00:00.0|40.7303|-74.0029|B02682|
|2014-08-01 00:00:00.0|40.7218|-73.9973|B02682|
|2014-08-01 00:00:00.0|40.7134|-74.0091|B02682|
|2014-08-01 00:00:00.0|40.7194|-73.9964|B02682|
+---------------------+-------+--------+------+
only showing top 20 rows
Python
Lors de l'écriture d'un bloc-notes Python 3, sélectionnez «Python 3» dans le bouton «Nouveau» en haut à droite. Divisez le code suivant en cellules aux points appropriés et exécutez-le. La différence avec Scala est que le Jar supplémentaire est spécifié dans la variable d'environnement PYSPARK_SUBMIT_ARGS
.
Vous pouvez écrire des applications Spark en Python de la même manière que Scala, comme indiqué ci-dessous.
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.amazonaws:aws-java-sdk:1.7.4,org.apache.hadoop:hadoop-aws:2.7.3 pyspark-shell'
from pyspark.sql import SparkSession
spark = (
SparkSession.builder
.getOrCreate()
)
sc = spark.sparkContext
sc._jsc.hadoopConfiguration().set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
sc._jsc.hadoopConfiguration().set("fs.s3a.fast.upload", "true")
from pyspark.sql.types import *
from pyspark.sql.functions import *
schema = StructType([
StructField("dt", TimestampType(), True),
StructField("lat", DoubleType(), True),
StructField("lon", DoubleType(), True),
StructField("base", StringType(), True)
])
df = (
spark.read
.option("header", False)
.schema(schema)
.csv("s3a://<Nom du godet>/uber-csv/uber.csv")
)
df.printSchema()
df.cache()
df.show(truncate=False)
La sortie finale de df.show (truncate = False)
est la même que le code Scala ci-dessus.
+---------------------+-------+--------+------+
|dt |lat |lon |base |
+---------------------+-------+--------+------+
|2014-08-01 00:00:00.0|40.729 |-73.9422|B02598|
|2014-08-01 00:00:00.0|40.7476|-73.9871|B02598|
|2014-08-01 00:00:00.0|40.7424|-74.0044|B02598|
|2014-08-01 00:00:00.0|40.751 |-73.9869|B02598|
|2014-08-01 00:00:00.0|40.7406|-73.9902|B02598|
|2014-08-01 00:00:00.0|40.6994|-73.9591|B02617|
|2014-08-01 00:00:00.0|40.6917|-73.9398|B02617|
|2014-08-01 00:00:00.0|40.7063|-73.9223|B02617|
|2014-08-01 00:00:00.0|40.6759|-74.0168|B02617|
|2014-08-01 00:00:00.0|40.7617|-73.9847|B02617|
|2014-08-01 00:00:00.0|40.6969|-73.9064|B02617|
|2014-08-01 00:00:00.0|40.7623|-73.9751|B02617|
|2014-08-01 00:00:00.0|40.6982|-73.9669|B02617|
|2014-08-01 00:00:00.0|40.7553|-73.9253|B02617|
|2014-08-01 00:00:00.0|40.7325|-73.9876|B02682|
|2014-08-01 00:00:00.0|40.6754|-74.017 |B02682|
|2014-08-01 00:00:00.0|40.7303|-74.0029|B02682|
|2014-08-01 00:00:00.0|40.7218|-73.9973|B02682|
|2014-08-01 00:00:00.0|40.7134|-74.0091|B02682|
|2014-08-01 00:00:00.0|40.7194|-73.9964|B02682|
+---------------------+-------+--------+------+
only showing top 20 rows
Recommended Posts