Stream processing of Python and SensorTag, Kafka, Spark Streaming --Part 6: Window aggregation of PySpark Streaming from Jupyter

Finally the title code is ready to run. Send SensorTag data to Kafka for PySpark Streaming window aggregation. Use Jupyter as an interactive execution environment for Python.

Preparation

We will use the Python scripts we have prepared so far and the Kafka and Spark clusters.

Notebook

Interactively run the code into Jupyter's Notebook to see it. Each of the following paragraphs corresponds to a cell. Open Jupyter from your web browser and select Python 3 from the New button in the upper right corner.

PYSPARK_SUBMIT_ARGS

Scala Jar files are fairly strict in version specification. Spark Streaming + Kafka Integration Guide says there are two Jar files for connecting to Kafka from Spark Streaming. Spark-streaming-kafka-0-8 with Kafka version 0.8.2.1 or higher ) And spark-streaming-kafka-0-10 of 0.10 or higher. The version of Kafka I am using this time is 0.10.2.1, but I will use spark-streaming-kafka-0-8 which supports Python.

The package name represents each version, such as spark-streaming-kafka-<Kafka version>_ <Scala version>: <Spark version>.

import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.1 pyspark-shell'

Magic command

Install the kafka-python package. Use Jupyter's Magic Command (http://ipython.readthedocs.io/en/stable/interactive/magics.html).

!pip install kafka-python

import

Import the packages required for Spark Streaming and Kafka.

import json
import pytz
from datetime import datetime
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import *
from pyspark.sql.types import *

from kafka import KafkaProducer

Spark context

Since Spark 2.x, the context of Spark has become confusing, but SparkSession.builder is the entry point. StreamingContexts are created at batch intervals of 1 minute.

spark = (
    SparkSession
        .builder
        .getOrCreate()
)

sc = spark.sparkContext
ssc = StreamingContext(sc, 60)

Kafka Producer

Defines a list of Kafka brokers, input and output topic names. A producer is also created to output the window aggregation result to the Kafka topic.

brokers = "<IP address of Kafka Broker>:9092"
sourceTopic = "sensortag"
sinkTopic = "sensortag-sink"

producer = KafkaProducer(bootstrap_servers=brokers,
                         value_serializer=lambda v: json.dumps(v).encode('utf-8'))

Window tabulation

This is the function that performs the main processing of this script. Create a DataFrame by applying the StructType schema to the Kafka JSON-converted RDD. Calculate the average value of ambient temperature (ambient) and humidity (rh) in a 2-minute window using the window aggregation function of DataFrame.

Also, since DataFrame deletes the time zone in the middle of processing, ʻAsia / Tokyo` time zone is added so that the result of window aggregation can be easily understood.

def windowAverage(rdd):
    schema = StructType([
        StructField('ambient', DoubleType(), True),
        StructField('bid', StringType(), True),
        StructField('humidity', DoubleType(), True),
        StructField('objecttemp', DoubleType(), True),
        StructField('rh', DoubleType(), True),
        StructField('time', TimestampType(), True),
    ])
        
    streamingInputDF = spark.createDataFrame(
        rdd, schema=schema
    )

    print('1 minute batch DataFrame')
    streamingInputDF.show(truncate=False)

    averageDF = (
        streamingInputDF
            .groupBy(
                streamingInputDF.bid,
                window("time", "2 minute"))
            .avg("ambient","rh")   
    )
    
    sinkRDD = averageDF.rdd.map(lambda x: {'bid': x[0], 
                                            'time': pytz.utc.localize(x[1]['end']).astimezone(pytz.timezone('Asia/Tokyo')).isoformat(), 
                                            'ambient': x[2], 
                                            'rh': x[3]})
    if not sinkRDD.isEmpty():
        print('Average value of 2-minute window')
        sinkList = sinkRDD.collect()
        print(sinkList)

        for sink in sinkList:
            producer.send(sinkTopic, sink)

Kafka stream creation

Specifies the IP address of the Kafka Broker and the topic to send the JSON string of the SensorTag from the Raspberry Pi 3. Deserialize the JSON string line by line pyspark.sql.RowCreate a. The time field converts UNIX time stamps to Python datetime and removes the timezone.

kafkaStream = KafkaUtils.createDirectStream(
    ssc, [sourceTopic], {"metadata.broker.list":brokers})

rowStream = (
    kafkaStream
        .map(lambda line: json.loads(line[1]))
        .map(lambda x: Row(
            ambient=x['ambient'],
            bid=x['bid'],
            humidity=x['humidity'],
            objecttemp=x['objecttemp'],
            rh=x['rh'],
            time=datetime.fromtimestamp(x['time']).replace(tzinfo=None),
        )
    )
)

rowStream.foreachRDD(windowAverage)

Start StreamingContext

Finally start the StreamingContext and wait for the program to stop.

ssc.start()
ssc.awaitTermination()

Script execution

Execute the Python script written in Part 2 from Raspberry Pi 3.

Output result

The following output will be displayed. The output of DataFrame does not have a time zone, but the result of window aggregation has a time zone.

1 minute batch DataFrame
+--------+-----------------+-----------------+----------+---------------+---------------------+
|ambient |bid              |humidity         |objecttemp|rh             |time                 |
+--------+-----------------+-----------------+----------+---------------+---------------------+
|28.78125|B0:B4:48:BD:DA:03|28.72314453125   |22.96875  |75.714111328125|2017-08-01 23:44:03.0|
|28.78125|B0:B4:48:BD:DA:03|28.72314453125   |22.90625  |75.714111328125|2017-08-01 23:44:13.0|
|28.75   |B0:B4:48:BD:DA:03|28.72314453125   |22.875    |75.616455078125|2017-08-01 23:44:23.0|
|28.75   |B0:B4:48:BD:DA:03|28.69293212890625|23.15625  |75.616455078125|2017-08-01 23:44:34.0|
|28.75   |B0:B4:48:BD:DA:03|28.7030029296875 |23.03125  |75.616455078125|2017-08-01 23:44:44.0|
|28.75   |B0:B4:48:BD:DA:03|28.69293212890625|23.125    |75.616455078125|2017-08-01 23:44:55.0|
+--------+-----------------+-----------------+----------+---------------+---------------------+

Average value of 2-minute window
[{'bid': 'B0:B4:48:BD:DA:03', 'time': '2017-08-02T08:46:00+09:00', 'ambient': 28.760416666666668, 'rh': 75.64900716145833}]

Recommended Posts

Stream processing of Python and SensorTag, Kafka, Spark Streaming --Part 6: Window aggregation of PySpark Streaming from Jupyter
Stream processing for Python and SensorTag, Kafka, Spark Streaming --Part 1: Raspberry Pi 3
Streaming Python and SensorTag, Kafka, Spark Streaming-Part 5: Connecting from Jupyter to Spark with Apache Toree