Endlich können Sie den Titelcode ausführen. Senden Sie SensorTag-Daten zur Aggregation von PySpark Streaming-Fenstern an Kafka. Verwenden Sie Jupyter als interaktive Ausführungsumgebung für Python.
Wir werden die bisher vorbereiteten Python-Skripte und die Kafka- und Spark-Cluster verwenden.
Notebook
Führen Sie den Code interaktiv in Jupyters Notizbuch aus, um ihn anzuzeigen. Jeder der folgenden Absätze entspricht einer Zelle. Öffnen Sie Jupyter in einem Webbrowser und wählen Sie "Python 3" über die Schaltfläche "Neu" oben rechts.
PYSPARK_SUBMIT_ARGS
Die Jar-Datei von Scala ist in der Versionsspezifikation ziemlich streng. Spark Streaming + Kafka Integrationshandbuch
gibt an, dass es zwei Jar-Dateien gibt, um über Spark Streaming eine Verbindung zu Kafka herzustellen. Spark-Streaming-Kafka-0-8 mit Kafka-Version 0.8.2.1
oder höher ) Und spark-Streaming-kafka-0-10 von 0.10
oder höher. Die Version von Kafka, die ich dieses Mal benutze, ist "0.10.2.1", aber ich werde "Spark-Streaming-Kafka-0-8" verwenden, das Python unterstützt.
Der Paketname steht für jede Version, z. B. Spark-Streaming-Kafka-<Kafka-Version>
_<Scala-Version>
:<Sfunken-Version>
.
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.1 pyspark-shell'
Installieren Sie das Paket kafka-python. Verwenden Sie Jupyters Magic Command (http://ipython.readthedocs.io/en/stable/interactive/magics.html).
!pip install kafka-python
import
Importieren Sie die für Spark Streaming und Kafka erforderlichen Pakete.
import json
import pytz
from datetime import datetime
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import *
from pyspark.sql.types import *
from kafka import KafkaProducer
Seit Spark 2.x ist der Kontext von Spark verwirrend geworden, aber ich werde SparkSession.builder als Einstiegspunkt verwenden. StreamingContexts werden in Stapelintervallen von 1 Minute erstellt.
spark = (
SparkSession
.builder
.getOrCreate()
)
sc = spark.sparkContext
ssc = StreamingContext(sc, 60)
Kafka Producer
brokers = "<IP-Adresse von Kafka Broker>:9092"
sourceTopic = "sensortag"
sinkTopic = "sensortag-sink"
producer = KafkaProducer(bootstrap_servers=brokers,
value_serializer=lambda v: json.dumps(v).encode('utf-8'))
Dies ist die Funktion, die die Hauptverarbeitung dieses Skripts ausführt. Erstellen Sie einen DataFrame, indem Sie das StructType-Schema auf das von Kafka JSON konvertierte RDD anwenden. Berechnen Sie den Durchschnittswert von Umgebungstemperatur (Umgebungstemperatur) und Luftfeuchtigkeit (rh) in einem 2-Minuten-Fenster mithilfe der Fensteraggregationsfunktion von DataFrame.
Da DataFrame die Zeitzone während der Verarbeitung löscht, wird die Zeitzone "Asien / Tokio" hinzugefügt, damit das Ergebnis der Fensteraggregation leicht verstanden werden kann.
def windowAverage(rdd):
schema = StructType([
StructField('ambient', DoubleType(), True),
StructField('bid', StringType(), True),
StructField('humidity', DoubleType(), True),
StructField('objecttemp', DoubleType(), True),
StructField('rh', DoubleType(), True),
StructField('time', TimestampType(), True),
])
streamingInputDF = spark.createDataFrame(
rdd, schema=schema
)
print('1 Minute Batch DataFrame')
streamingInputDF.show(truncate=False)
averageDF = (
streamingInputDF
.groupBy(
streamingInputDF.bid,
window("time", "2 minute"))
.avg("ambient","rh")
)
sinkRDD = averageDF.rdd.map(lambda x: {'bid': x[0],
'time': pytz.utc.localize(x[1]['end']).astimezone(pytz.timezone('Asia/Tokyo')).isoformat(),
'ambient': x[2],
'rh': x[3]})
if not sinkRDD.isEmpty():
print('Durchschnittswert des 2-Minuten-Fensters')
sinkList = sinkRDD.collect()
print(sinkList)
for sink in sinkList:
producer.send(sinkTopic, sink)
Gibt die Kafka Broker-IP-Adresse und das Thema zum Senden der SensorTag-JSON-Zeichenfolge von Raspberry Pi 3 an. Deserialisieren Sie die JSON-Zeichenfolge Zeile für Zeile pyspark.sql.RowEin ... kreieren. Das Feld "Zeit" konvertiert den UNIX-Zeitstempel in Python datetime und entfernt die Zeitzone.
kafkaStream = KafkaUtils.createDirectStream(
ssc, [sourceTopic], {"metadata.broker.list":brokers})
rowStream = (
kafkaStream
.map(lambda line: json.loads(line[1]))
.map(lambda x: Row(
ambient=x['ambient'],
bid=x['bid'],
humidity=x['humidity'],
objecttemp=x['objecttemp'],
rh=x['rh'],
time=datetime.fromtimestamp(x['time']).replace(tzinfo=None),
)
)
)
rowStream.foreachRDD(windowAverage)
Starten Sie abschließend den StreamingContext und warten Sie, bis das Programm beendet ist.
ssc.start()
ssc.awaitTermination()
Führen Sie das in Teil 2 geschriebene Python-Skript von Raspberry Pi 3 aus.
Die folgende Ausgabe wird angezeigt. Während die Ausgabe von DataFrame keine Zeitzone enthält, wird die Zeitzone zum Ergebnis der Fensteraggregation hinzugefügt.
1 Minute Batch DataFrame
+--------+-----------------+-----------------+----------+---------------+---------------------+
|ambient |bid |humidity |objecttemp|rh |time |
+--------+-----------------+-----------------+----------+---------------+---------------------+
|28.78125|B0:B4:48:BD:DA:03|28.72314453125 |22.96875 |75.714111328125|2017-08-01 23:44:03.0|
|28.78125|B0:B4:48:BD:DA:03|28.72314453125 |22.90625 |75.714111328125|2017-08-01 23:44:13.0|
|28.75 |B0:B4:48:BD:DA:03|28.72314453125 |22.875 |75.616455078125|2017-08-01 23:44:23.0|
|28.75 |B0:B4:48:BD:DA:03|28.69293212890625|23.15625 |75.616455078125|2017-08-01 23:44:34.0|
|28.75 |B0:B4:48:BD:DA:03|28.7030029296875 |23.03125 |75.616455078125|2017-08-01 23:44:44.0|
|28.75 |B0:B4:48:BD:DA:03|28.69293212890625|23.125 |75.616455078125|2017-08-01 23:44:55.0|
+--------+-----------------+-----------------+----------+---------------+---------------------+
Durchschnittswert des 2-Minuten-Fensters
[{'bid': 'B0:B4:48:BD:DA:03', 'time': '2017-08-02T08:46:00+09:00', 'ambient': 28.760416666666668, 'rh': 75.64900716145833}]
Recommended Posts