Traitement de flux de Python et SensorTag, Kafka, Spark Streaming - Partie 6: Agrégation de fenêtres de PySpark Streaming à partir de Jupyter

Enfin, vous êtes prêt à exécuter le code du titre. Envoyez les données SensorTag à Kafka pour l'agrégation de la fenêtre PySpark Streaming. Utilisez Jupyter comme environnement d'exécution interactif pour Python.

Préparation

Nous utiliserons les scripts Python que nous avons préparés jusqu'à présent et les clusters Kafka et Spark.

Notebook

Exécutez le code de manière interactive dans Jupyter's Notebook pour le voir. Chacun des paragraphes suivants correspond à une cellule. Ouvrez Jupyter à partir d'un navigateur Web et sélectionnez «Python 3» dans le bouton «Nouveau» en haut à droite.

PYSPARK_SUBMIT_ARGS

Le fichier Jar de Scala est assez strict dans la spécification de version. Guide d'intégration Spark Streaming + Kafka indique qu'il existe deux fichiers Jar pour se connecter à Kafka depuis Spark Streaming. Spark-streaming-kafka-0-8 avec la version Kafka 0.8.2.1 ou supérieure ) Et spark-streaming-kafka-0-10 de «0.10» ou supérieur. La version de Kafka que j'utilise cette fois est 0.10.2.1, mais j'utiliserai spark-streaming-kafka-0-8 qui prend en charge Python.

Le nom du package représente chaque version, par exemple spark-streaming-kafka-<Kafka version> _<Scala version>:<Spark version>.

import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.1 pyspark-shell'

Commande magique

Installez le package kafka-python. Utilisez la commande magique de Jupyter (http://ipython.readthedocs.io/en/stable/interactive/magics.html).

!pip install kafka-python

import

Importez les packages requis pour Spark Streaming et Kafka.

import json
import pytz
from datetime import datetime
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import *
from pyspark.sql.types import *

from kafka import KafkaProducer

Contexte Spark

Depuis Spark 2.x, le contexte de Spark est devenu déroutant, mais j'utiliserai SparkSession.builder comme point d'entrée. StreamingContexts sont créés à des intervalles de lots de 1 minute.

spark = (
    SparkSession
        .builder
        .getOrCreate()
)

sc = spark.sparkContext
ssc = StreamingContext(sc, 60)

Kafka Producer

Définit une liste des courtiers Kafka, des noms de sujets d'entrée et de sortie. Un producteur est également créé pour afficher le résultat de l'agrégation de fenêtres dans la rubrique Kafka.

brokers = "<Adresse IP du courtier Kafka>:9092"
sourceTopic = "sensortag"
sinkTopic = "sensortag-sink"

producer = KafkaProducer(bootstrap_servers=brokers,
                         value_serializer=lambda v: json.dumps(v).encode('utf-8'))

Tabulation des fenêtres

C'est la fonction qui effectue le traitement principal de ce script. Créez un DataFrame en appliquant le schéma StructType au RDD Kafka JSON converti. Calculez la valeur moyenne de la température ambiante (ambiante) et de l'humidité (hr) dans une fenêtre de 2 minutes à l'aide de la fonction d'agrégation de fenêtres de DataFrame.

De plus, puisque DataFrame supprime le fuseau horaire au milieu du traitement, le fuseau horaire «Asie / Tokyo» est ajouté afin que le résultat de l'agrégation des fenêtres puisse être facilement compris.

def windowAverage(rdd):
    schema = StructType([
        StructField('ambient', DoubleType(), True),
        StructField('bid', StringType(), True),
        StructField('humidity', DoubleType(), True),
        StructField('objecttemp', DoubleType(), True),
        StructField('rh', DoubleType(), True),
        StructField('time', TimestampType(), True),
    ])
        
    streamingInputDF = spark.createDataFrame(
        rdd, schema=schema
    )

    print('Lot de 1 minute DataFrame')
    streamingInputDF.show(truncate=False)

    averageDF = (
        streamingInputDF
            .groupBy(
                streamingInputDF.bid,
                window("time", "2 minute"))
            .avg("ambient","rh")   
    )
    
    sinkRDD = averageDF.rdd.map(lambda x: {'bid': x[0], 
                                            'time': pytz.utc.localize(x[1]['end']).astimezone(pytz.timezone('Asia/Tokyo')).isoformat(), 
                                            'ambient': x[2], 
                                            'rh': x[3]})
    if not sinkRDD.isEmpty():
        print('Valeur moyenne d'une fenêtre de 2 minutes')
        sinkList = sinkRDD.collect()
        print(sinkList)

        for sink in sinkList:
            producer.send(sinkTopic, sink)

Création de flux Kafka

Spécifie l'adresse IP du courtier Kafka et la rubrique pour envoyer la chaîne JSON SensorTag à partir de Raspberry Pi 3. Désérialisez la chaîne JSON ligne par ligne pyspark.sql.RowCréer un. Le champ time convertit l'horodatage UNIX en datetime Python et supprime le fuseau horaire.

kafkaStream = KafkaUtils.createDirectStream(
    ssc, [sourceTopic], {"metadata.broker.list":brokers})

rowStream = (
    kafkaStream
        .map(lambda line: json.loads(line[1]))
        .map(lambda x: Row(
            ambient=x['ambient'],
            bid=x['bid'],
            humidity=x['humidity'],
            objecttemp=x['objecttemp'],
            rh=x['rh'],
            time=datetime.fromtimestamp(x['time']).replace(tzinfo=None),
        )
    )
)

rowStream.foreachRDD(windowAverage)

Démarrer StreamingContext

Enfin, démarrez le StreamingContext et attendez que le programme s'arrête.

ssc.start()
ssc.awaitTermination()

Script de lancement

Exécutez le script Python écrit dans Partie 2 à partir de Raspberry Pi 3.

Résultat de sortie

La sortie suivante sera affichée. Bien qu'il n'y ait pas de fuseau horaire dans la sortie de DataFrame, le fuseau horaire est ajouté au résultat de l'agrégation de fenêtres.

Lot de 1 minute DataFrame
+--------+-----------------+-----------------+----------+---------------+---------------------+
|ambient |bid              |humidity         |objecttemp|rh             |time                 |
+--------+-----------------+-----------------+----------+---------------+---------------------+
|28.78125|B0:B4:48:BD:DA:03|28.72314453125   |22.96875  |75.714111328125|2017-08-01 23:44:03.0|
|28.78125|B0:B4:48:BD:DA:03|28.72314453125   |22.90625  |75.714111328125|2017-08-01 23:44:13.0|
|28.75   |B0:B4:48:BD:DA:03|28.72314453125   |22.875    |75.616455078125|2017-08-01 23:44:23.0|
|28.75   |B0:B4:48:BD:DA:03|28.69293212890625|23.15625  |75.616455078125|2017-08-01 23:44:34.0|
|28.75   |B0:B4:48:BD:DA:03|28.7030029296875 |23.03125  |75.616455078125|2017-08-01 23:44:44.0|
|28.75   |B0:B4:48:BD:DA:03|28.69293212890625|23.125    |75.616455078125|2017-08-01 23:44:55.0|
+--------+-----------------+-----------------+----------+---------------+---------------------+

Valeur moyenne d'une fenêtre de 2 minutes
[{'bid': 'B0:B4:48:BD:DA:03', 'time': '2017-08-02T08:46:00+09:00', 'ambient': 28.760416666666668, 'rh': 75.64900716145833}]

Recommended Posts

Traitement de flux de Python et SensorTag, Kafka, Spark Streaming - Partie 6: Agrégation de fenêtres de PySpark Streaming à partir de Jupyter
Traitement de flux de Python et SensorTag, Kafka, Spark Streaming - Partie 1: Raspberry Pi 3
Streaming Python et SensorTag, Kafka, Spark Streaming - Partie 5: Connexion de Jupyter à Spark avec Apache Toree