Stream-Verarbeitung von Python und SensorTag, Kafka, Spark Streaming - Teil 6: Fensteraggregation von PySpark Streaming von Jupyter

Endlich können Sie den Titelcode ausführen. Senden Sie SensorTag-Daten zur Aggregation von PySpark Streaming-Fenstern an Kafka. Verwenden Sie Jupyter als interaktive Ausführungsumgebung für Python.

Vorbereitung

Wir werden die bisher vorbereiteten Python-Skripte und die Kafka- und Spark-Cluster verwenden.

Notebook

Führen Sie den Code interaktiv in Jupyters Notizbuch aus, um ihn anzuzeigen. Jeder der folgenden Absätze entspricht einer Zelle. Öffnen Sie Jupyter in einem Webbrowser und wählen Sie "Python 3" über die Schaltfläche "Neu" oben rechts.

PYSPARK_SUBMIT_ARGS

Die Jar-Datei von Scala ist in der Versionsspezifikation ziemlich streng. Spark Streaming + Kafka Integrationshandbuch gibt an, dass es zwei Jar-Dateien gibt, um über Spark Streaming eine Verbindung zu Kafka herzustellen. Spark-Streaming-Kafka-0-8 mit Kafka-Version 0.8.2.1 oder höher ) Und spark-Streaming-kafka-0-10 von 0.10 oder höher. Die Version von Kafka, die ich dieses Mal benutze, ist "0.10.2.1", aber ich werde "Spark-Streaming-Kafka-0-8" verwenden, das Python unterstützt.

Der Paketname steht für jede Version, z. B. Spark-Streaming-Kafka-<Kafka-Version> _<Scala-Version>:<Sfunken-Version>.

import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.1 pyspark-shell'

Magischer Befehl

Installieren Sie das Paket kafka-python. Verwenden Sie Jupyters Magic Command (http://ipython.readthedocs.io/en/stable/interactive/magics.html).

!pip install kafka-python

import

Importieren Sie die für Spark Streaming und Kafka erforderlichen Pakete.

import json
import pytz
from datetime import datetime
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import *
from pyspark.sql.types import *

from kafka import KafkaProducer

Spark-Kontext

Seit Spark 2.x ist der Kontext von Spark verwirrend geworden, aber ich werde SparkSession.builder als Einstiegspunkt verwenden. StreamingContexts werden in Stapelintervallen von 1 Minute erstellt.

spark = (
    SparkSession
        .builder
        .getOrCreate()
)

sc = spark.sparkContext
ssc = StreamingContext(sc, 60)

Kafka Producer

brokers = "<IP-Adresse von Kafka Broker>:9092"
sourceTopic = "sensortag"
sinkTopic = "sensortag-sink"

producer = KafkaProducer(bootstrap_servers=brokers,
                         value_serializer=lambda v: json.dumps(v).encode('utf-8'))

Fenstertabelle

Dies ist die Funktion, die die Hauptverarbeitung dieses Skripts ausführt. Erstellen Sie einen DataFrame, indem Sie das StructType-Schema auf das von Kafka JSON konvertierte RDD anwenden. Berechnen Sie den Durchschnittswert von Umgebungstemperatur (Umgebungstemperatur) und Luftfeuchtigkeit (rh) in einem 2-Minuten-Fenster mithilfe der Fensteraggregationsfunktion von DataFrame.

Da DataFrame die Zeitzone während der Verarbeitung löscht, wird die Zeitzone "Asien / Tokio" hinzugefügt, damit das Ergebnis der Fensteraggregation leicht verstanden werden kann.

def windowAverage(rdd):
    schema = StructType([
        StructField('ambient', DoubleType(), True),
        StructField('bid', StringType(), True),
        StructField('humidity', DoubleType(), True),
        StructField('objecttemp', DoubleType(), True),
        StructField('rh', DoubleType(), True),
        StructField('time', TimestampType(), True),
    ])
        
    streamingInputDF = spark.createDataFrame(
        rdd, schema=schema
    )

    print('1 Minute Batch DataFrame')
    streamingInputDF.show(truncate=False)

    averageDF = (
        streamingInputDF
            .groupBy(
                streamingInputDF.bid,
                window("time", "2 minute"))
            .avg("ambient","rh")   
    )
    
    sinkRDD = averageDF.rdd.map(lambda x: {'bid': x[0], 
                                            'time': pytz.utc.localize(x[1]['end']).astimezone(pytz.timezone('Asia/Tokyo')).isoformat(), 
                                            'ambient': x[2], 
                                            'rh': x[3]})
    if not sinkRDD.isEmpty():
        print('Durchschnittswert des 2-Minuten-Fensters')
        sinkList = sinkRDD.collect()
        print(sinkList)

        for sink in sinkList:
            producer.send(sinkTopic, sink)

Kafka Stream Erstellung

Gibt die Kafka Broker-IP-Adresse und das Thema zum Senden der SensorTag-JSON-Zeichenfolge von Raspberry Pi 3 an. Deserialisieren Sie die JSON-Zeichenfolge Zeile für Zeile pyspark.sql.RowEin ... kreieren. Das Feld "Zeit" konvertiert den UNIX-Zeitstempel in Python datetime und entfernt die Zeitzone.

kafkaStream = KafkaUtils.createDirectStream(
    ssc, [sourceTopic], {"metadata.broker.list":brokers})

rowStream = (
    kafkaStream
        .map(lambda line: json.loads(line[1]))
        .map(lambda x: Row(
            ambient=x['ambient'],
            bid=x['bid'],
            humidity=x['humidity'],
            objecttemp=x['objecttemp'],
            rh=x['rh'],
            time=datetime.fromtimestamp(x['time']).replace(tzinfo=None),
        )
    )
)

rowStream.foreachRDD(windowAverage)

Starten Sie StreamingContext

Starten Sie abschließend den StreamingContext und warten Sie, bis das Programm beendet ist.

ssc.start()
ssc.awaitTermination()

Skript ausführen

Führen Sie das in Teil 2 geschriebene Python-Skript von Raspberry Pi 3 aus.

Ausgabeergebnis

Die folgende Ausgabe wird angezeigt. Während die Ausgabe von DataFrame keine Zeitzone enthält, wird die Zeitzone zum Ergebnis der Fensteraggregation hinzugefügt.

1 Minute Batch DataFrame
+--------+-----------------+-----------------+----------+---------------+---------------------+
|ambient |bid              |humidity         |objecttemp|rh             |time                 |
+--------+-----------------+-----------------+----------+---------------+---------------------+
|28.78125|B0:B4:48:BD:DA:03|28.72314453125   |22.96875  |75.714111328125|2017-08-01 23:44:03.0|
|28.78125|B0:B4:48:BD:DA:03|28.72314453125   |22.90625  |75.714111328125|2017-08-01 23:44:13.0|
|28.75   |B0:B4:48:BD:DA:03|28.72314453125   |22.875    |75.616455078125|2017-08-01 23:44:23.0|
|28.75   |B0:B4:48:BD:DA:03|28.69293212890625|23.15625  |75.616455078125|2017-08-01 23:44:34.0|
|28.75   |B0:B4:48:BD:DA:03|28.7030029296875 |23.03125  |75.616455078125|2017-08-01 23:44:44.0|
|28.75   |B0:B4:48:BD:DA:03|28.69293212890625|23.125    |75.616455078125|2017-08-01 23:44:55.0|
+--------+-----------------+-----------------+----------+---------------+---------------------+

Durchschnittswert des 2-Minuten-Fensters
[{'bid': 'B0:B4:48:BD:DA:03', 'time': '2017-08-02T08:46:00+09:00', 'ambient': 28.760416666666668, 'rh': 75.64900716145833}]

Recommended Posts

Stream-Verarbeitung von Python und SensorTag, Kafka, Spark Streaming - Teil 6: Fensteraggregation von PySpark Streaming von Jupyter
Stream-Verarbeitung von Python und SensorTag, Kafka, Spark Streaming - Teil 1: Raspberry Pi 3
Streaming von Python und SensorTag, Kafka, Spark Streaming-Teil 5: Verbindung von Jupyter zu Spark mit Apache Toree