Enfin, vous êtes prêt à exécuter le code du titre. Envoyez les données SensorTag à Kafka pour l'agrégation de la fenêtre PySpark Streaming. Utilisez Jupyter comme environnement d'exécution interactif pour Python.
Nous utiliserons les scripts Python que nous avons préparés jusqu'à présent et les clusters Kafka et Spark.
Notebook
Exécutez le code de manière interactive dans Jupyter's Notebook pour le voir. Chacun des paragraphes suivants correspond à une cellule. Ouvrez Jupyter à partir d'un navigateur Web et sélectionnez «Python 3» dans le bouton «Nouveau» en haut à droite.
PYSPARK_SUBMIT_ARGS
Le fichier Jar de Scala est assez strict dans la spécification de version. Guide d'intégration Spark Streaming + Kafka
indique qu'il existe deux fichiers Jar pour se connecter à Kafka depuis Spark Streaming. Spark-streaming-kafka-0-8 avec la version Kafka 0.8.2.1
ou supérieure ) Et spark-streaming-kafka-0-10 de «0.10» ou supérieur. La version de Kafka que j'utilise cette fois est 0.10.2.1
, mais j'utiliserai spark-streaming-kafka-0-8
qui prend en charge Python.
Le nom du package représente chaque version, par exemple spark-streaming-kafka-<Kafka version>
_<Scala version>
:<Spark version>
.
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.1 pyspark-shell'
Installez le package kafka-python. Utilisez la commande magique de Jupyter (http://ipython.readthedocs.io/en/stable/interactive/magics.html).
!pip install kafka-python
import
Importez les packages requis pour Spark Streaming et Kafka.
import json
import pytz
from datetime import datetime
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import *
from pyspark.sql.types import *
from kafka import KafkaProducer
Depuis Spark 2.x, le contexte de Spark est devenu déroutant, mais j'utiliserai SparkSession.builder comme point d'entrée. StreamingContexts sont créés à des intervalles de lots de 1 minute.
spark = (
SparkSession
.builder
.getOrCreate()
)
sc = spark.sparkContext
ssc = StreamingContext(sc, 60)
Kafka Producer
Définit une liste des courtiers Kafka, des noms de sujets d'entrée et de sortie. Un producteur est également créé pour afficher le résultat de l'agrégation de fenêtres dans la rubrique Kafka.
brokers = "<Adresse IP du courtier Kafka>:9092"
sourceTopic = "sensortag"
sinkTopic = "sensortag-sink"
producer = KafkaProducer(bootstrap_servers=brokers,
value_serializer=lambda v: json.dumps(v).encode('utf-8'))
C'est la fonction qui effectue le traitement principal de ce script. Créez un DataFrame en appliquant le schéma StructType au RDD Kafka JSON converti. Calculez la valeur moyenne de la température ambiante (ambiante) et de l'humidité (hr) dans une fenêtre de 2 minutes à l'aide de la fonction d'agrégation de fenêtres de DataFrame.
De plus, puisque DataFrame supprime le fuseau horaire au milieu du traitement, le fuseau horaire «Asie / Tokyo» est ajouté afin que le résultat de l'agrégation des fenêtres puisse être facilement compris.
def windowAverage(rdd):
schema = StructType([
StructField('ambient', DoubleType(), True),
StructField('bid', StringType(), True),
StructField('humidity', DoubleType(), True),
StructField('objecttemp', DoubleType(), True),
StructField('rh', DoubleType(), True),
StructField('time', TimestampType(), True),
])
streamingInputDF = spark.createDataFrame(
rdd, schema=schema
)
print('Lot de 1 minute DataFrame')
streamingInputDF.show(truncate=False)
averageDF = (
streamingInputDF
.groupBy(
streamingInputDF.bid,
window("time", "2 minute"))
.avg("ambient","rh")
)
sinkRDD = averageDF.rdd.map(lambda x: {'bid': x[0],
'time': pytz.utc.localize(x[1]['end']).astimezone(pytz.timezone('Asia/Tokyo')).isoformat(),
'ambient': x[2],
'rh': x[3]})
if not sinkRDD.isEmpty():
print('Valeur moyenne d'une fenêtre de 2 minutes')
sinkList = sinkRDD.collect()
print(sinkList)
for sink in sinkList:
producer.send(sinkTopic, sink)
Spécifie l'adresse IP du courtier Kafka et la rubrique pour envoyer la chaîne JSON SensorTag à partir de Raspberry Pi 3. Désérialisez la chaîne JSON ligne par ligne pyspark.sql.RowCréer un. Le champ time
convertit l'horodatage UNIX en datetime Python et supprime le fuseau horaire.
kafkaStream = KafkaUtils.createDirectStream(
ssc, [sourceTopic], {"metadata.broker.list":brokers})
rowStream = (
kafkaStream
.map(lambda line: json.loads(line[1]))
.map(lambda x: Row(
ambient=x['ambient'],
bid=x['bid'],
humidity=x['humidity'],
objecttemp=x['objecttemp'],
rh=x['rh'],
time=datetime.fromtimestamp(x['time']).replace(tzinfo=None),
)
)
)
rowStream.foreachRDD(windowAverage)
Enfin, démarrez le StreamingContext et attendez que le programme s'arrête.
ssc.start()
ssc.awaitTermination()
Exécutez le script Python écrit dans Partie 2 à partir de Raspberry Pi 3.
La sortie suivante sera affichée. Bien qu'il n'y ait pas de fuseau horaire dans la sortie de DataFrame, le fuseau horaire est ajouté au résultat de l'agrégation de fenêtres.
Lot de 1 minute DataFrame
+--------+-----------------+-----------------+----------+---------------+---------------------+
|ambient |bid |humidity |objecttemp|rh |time |
+--------+-----------------+-----------------+----------+---------------+---------------------+
|28.78125|B0:B4:48:BD:DA:03|28.72314453125 |22.96875 |75.714111328125|2017-08-01 23:44:03.0|
|28.78125|B0:B4:48:BD:DA:03|28.72314453125 |22.90625 |75.714111328125|2017-08-01 23:44:13.0|
|28.75 |B0:B4:48:BD:DA:03|28.72314453125 |22.875 |75.616455078125|2017-08-01 23:44:23.0|
|28.75 |B0:B4:48:BD:DA:03|28.69293212890625|23.15625 |75.616455078125|2017-08-01 23:44:34.0|
|28.75 |B0:B4:48:BD:DA:03|28.7030029296875 |23.03125 |75.616455078125|2017-08-01 23:44:44.0|
|28.75 |B0:B4:48:BD:DA:03|28.69293212890625|23.125 |75.616455078125|2017-08-01 23:44:55.0|
+--------+-----------------+-----------------+----------+---------------+---------------------+
Valeur moyenne d'une fenêtre de 2 minutes
[{'bid': 'B0:B4:48:BD:DA:03', 'time': '2017-08-02T08:46:00+09:00', 'ambient': 28.760416666666668, 'rh': 75.64900716145833}]
Recommended Posts