Window aggregation of SensorTag with Kafka Streams

I used Jupyter as the operating environment for SensorTag data in PySpark Streaming I tried window aggregation. There are several other stream processing frameworks, but next I'll try using Kafka Streams. Unlike Spark, this is a library, not a cluster. Currently, the development language officially supports only Java.

Java environment

I will write the code of Eclim built on Ubuntu 16.04 with Maven.

project

Create the following files in the project directory. The complete code can be found here in the Repository (https://github.com/masato/streams-kafka-examples).

$  tree
.
├── pom.xml
└── src
    └── main
        ├── java
        │   └── com
        │       └── github
        │           └── masato
        │               └── streams
        │                   └── kafka
        │                       ├── App.java
        │                       ├── SensorSumDeserializer.java
        │                       ├── SensorSum.java
        │                       └── SensorSumSerializer.java
        └── resources
            └── logback.xml

9 directories, 7 files

App.java

I will explain the code in several parts.

constant

The topic name etc. are obtained from the environment variables defined in pom.xml. WINDOWS_MINUTES is the interval for window aggregation. COMMIT_MINUTES is the interval at which Kafka automatically commits the cache, as described below. Here, specify by minutes.

App.java


public class App {

    private static final Logger LOG = LoggerFactory.getLogger(App.class);

    private static final String SOURCE_TOPIC = System.getenv("SOURCE_TOPIC");
    private static final String SINK_TOPIC = System.getenv("SINK_TOPIC");
    private static final long WINDOWS_MINUTES = 2L;
    private static final long COMMIT_MINUTES = 3L;

Serialization

Create a record serializer and deserializer. In the Kafka Streams app, the intermediate result of the process is saved in the topic and the flow is implemented. SerDe is defined by combining the deserializer for reading records from a topic and the serializer for writing records. SerDe is required for each topic key and value type.

App.java


    public static void main(String[] args) throws Exception {

        final Serializer<JsonNode> jsonSerializer = new JsonSerializer();
        final Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
        final Serde<JsonNode> jsonSerde =
            Serdes.serdeFrom(jsonSerializer, jsonDeserializer);

        final Serializer<SensorSum> sensorSumSerializer =
            new SensorSumSerializer();
        final Deserializer<SensorSum> sensorSumDeserializer =
            new SensorSumDeserializer();
        final Serde<SensorSum> sensorSumSerde =
            Serdes.serdeFrom(sensorSumSerializer,
                             sensorSumDeserializer);

        final Serde<String> stringSerde = Serdes.String();
        final Serde<Double> doubleSerde = Serdes.Double();

Creating a KStream

First call stream () of KStreamBuilder and call [KStream](https: // kafka.apache.org/0102/javadoc/org/apache/kafka/streams/kstream/KStream.html). The key of the topic is a character string, and the value is SerDe of JsonNode.

App.java


        final KStreamBuilder builder = new KStreamBuilder();

        LOG.info("Starting Sorting Job");

        final KStream<String, JsonNode> source =
            builder.stream(stringSerde, jsonSerde, SOURCE_TOPIC);

Create KGroupedStream

The SensorTag message arrives on the Kafka topic as a JSON string from the Raspberry Pi 3.

{'bid': 'B0:B4:48:BE:5E:00', 'time': 1502152524, 'humidity': 27.26287841796875, 'objecttemp': 21.1875, 'ambient': 27.03125, 'rh': 75.311279296875}

A KStream record is a KeyValue object with a key and a value. In the example, we call map () to create a new KStream with only a key and ambient temperature pair in order to window aggregate only the average ambient temperature (ambient).

Then call groupByKey () and group by key to create KGroupedStream .. In the record, the key is a character string and the value is the ambient temperature double, so specify each SerDe.

App.java


        final KGroupedStream<String, Double> sensors =
            source
            .map((k, v) -> {
                    double ambient = v.get("ambient").asDouble();
                    return KeyValue.pair(k, ambient);
                })
            .groupByKey(stringSerde, doubleSerde);

Create KTable from KStram

Call ʻaggregate ()` of KGroupedStream to create KTable. KTable keeps the total value of records and the number of records at the window interval specified for each key.

In the first argument of ʻaggregate () [Initializer](https://kafka.apache.org/0102/javadoc/org/apache/kafka/streams/kstream/Initializer.html), the aggregate used for stream aggregation Initialize. Here, initialize the SensorSumthat holds the state of window aggregation. Implement the aggregator with the second argument. The key and value of the current record and theSensorSumcreated in the previous record processing are passed. Returns a newSensorSumby adding the total value and the number of records each time data arrives. The third argument defines the 2-minute window [TimeWindows](http://apache.mesi.com.ar/kafka/0.10.2.0/javadoc/org/apache/kafka/streams/kstream/TimeWindows.html). .. The 4th argument is SerDe ofSensorSum`, and the 5th argument is the topic name that holds the state.

App.java


        final KTable<Windowed<String>, SensorSum> sensorAgg =
            sensors
            .aggregate(() -> new SensorSum(0D, 0)
                       , (aggKey, value, agg) -> new SensorSum(agg.sum + value, agg.count + 1)
                       , TimeWindows.of(TimeUnit.MINUTES.toMillis(WINDOWS_MINUTES))
                       , sensorSumSerde,
                       "sensorSum");

Create Kstram from KTable

Calculate the mean with mapValues () in KTable. The average of the sum divided by the number of records is the new KTable for the Double record. From here, call toStream () to create a KStream. The record is formatted into a JSON string with a timestamp, key, and mean and output to the stream. Timestamps are set to ISO 8601 to facilitate data exchange between different systems. Save the record to the last specified topic and finish.

App.java


        final DateTimeFormatter fmt =
            DateTimeFormatter.ISO_OFFSET_DATE_TIME;

        sensorAgg
            .<Double>mapValues((v) -> ((double) v.sum / v.count))
            .toStream()
            .map((key, avg) -> {
                    long end = key.window().end();
                    ZonedDateTime zdt =
                        new Date(end).toInstant()
                        .atZone(ZoneId.systemDefault());
                    String time = fmt.format(zdt);
                    String bid = key.key();
                    String retval =
                        String.format("{\"time\": \"%s\", \"bid\": \"%s\", \"ambient\": %f}",
                                      time, bid, avg);
                    LOG.info(retval);
                    return new KeyValue<String,String>(bid, retval);
             })
            .to(SINK_TOPIC);

Start of Kafka Streams

Create Kafka Streams from the configuration objects and builder to start the Kafka Streams app. Also, register it in the shutdown hook to stop Kafka Stream with SIGTERM.

App.java


        final StreamsConfig config = new StreamsConfig(getProperties());
        final KafkaStreams streams = new KafkaStreams(builder, config);
        streams.start();
        
        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));

Kafka Streams settings and timeouts

Create Properties to be used in Kafka Streams settings from environment variables.

App.java


    private static Properties getProperties() {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG,
                  System.getenv("APPLICATION_ID_CONFIG"));
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
                  System.getenv("BOOTSTRAP_SERVERS_CONFIG"));
        props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
                  Serdes.String().getClass().getName());
        props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
                  Serdes.String().getClass().getName());
        props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
                  WallclockTimestampExtractor.class.getName());
        props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,
                  TimeUnit.MINUTES.toMillis(COMMIT_MINUTES));

        return props;
    }

COMMIT_INTERVAL_MS_CONFIG

Initially, I didn't change StreamsConfig.COMMIT_INTERVAL_MS_CONFIG. Before saving the record as a topic, the log is output by map () of KStream. I wanted to output the aggregated result of the 2-minute window interval only once at the end, but the result was unspecified 4-5 times.

{"time": "2017-08-08T10:34:00+09:00", "bid": "B0:B4:48:BE:5E:00", "ambient": 27.343750}
{"time": "2017-08-08T10:34:00+09:00", "bid": "B0:B4:48:BE:5E:00", "ambient": 27.385417}
{"time": "2017-08-08T10:34:00+09:00", "bid": "B0:B4:48:BE:5E:00", "ambient": 27.410156}
{"time": "2017-08-08T10:34:00+09:00", "bid": "B0:B4:48:BE:5E:00", "ambient": 27.440341}
{"time": "2017-08-08T10:34:00+09:00", "bid": "B0:B4:48:BE:5E:00", "ambient": 27.450521}
{"time": "2017-08-08T10:36:00+09:00", "bid": "B0:B4:48:BE:5E:00", "ambient": 27.562500}
{"time": "2017-08-08T10:36:00+09:00", "bid": "B0:B4:48:BE:5E:00", "ambient": 27.562500}

With reference to the following articles, this seems to be the expected behavior due to the KTable's changelog stream feature. There is no final result of window aggregation in KTable, and the values updated in the cache are committed at regular intervals. It seems that you need to implement your own code to remove duplicate records using transform () and process () after toStream () to KStream.

You cannot completely eliminate duplicate records, but you can reduce the number of cache commits by increasing the value of StreamsConfig.COMMIT_INTERVAL_MS_CONFIG. 30 seconds is specified for the Default value (http://docs.confluent.io/3.2.1/streams/developer-guide.html#optional-configuration-parameters).

Other classes

Prepare the model (SensorSum.java), serializer (SensorSumSerializer.java), and deserializer (SensorSumDeserializer) classes. The serializer implements serialize () to convert the properties of SensorSum to a byte array. Allocate 8 bytes of Double of the total ambient temperature value and 4 bytes of ʻInteger` of the number of records to the byte buffer.

SensorSumSerializer.java


    public byte[] serialize(String topic, SensorSum data) {
        ByteBuffer buffer = ByteBuffer.allocate(8 + 4);
        buffer.putDouble(data.sum);
        buffer.putInt(data.count);

        return buffer.array();
    }

Run

Run Kafka Streams from the Exec Maven Plugin (http://www.mojohaus.org/exec-maven-plugin/).

$ mvn clean install exec:exec@json

I set the window interval to 2 minutes and the cache commit interval to 3 minutes. After all, there are duplicate outputs several times, but I was able to reduce the duplicate outputs.

{"time": "2017-08-08T11:32:00+09:00", "bid": "B0:B4:48:BE:5E:00", "ambient": 27.414773}
{"time": "2017-08-08T11:34:00+09:00", "bid": "B0:B4:48:BE:5E:00", "ambient": 27.414063}
{"time": "2017-08-08T11:36:00+09:00", "bid": "B0:B4:48:BE:5E:00", "ambient": 27.453125}
{"time": "2017-08-08T11:36:00+09:00", "bid": "B0:B4:48:BE:5E:00", "ambient": 27.476563}
{"time": "2017-08-08T11:38:00+09:00", "bid": "B0:B4:48:BE:5E:00", "ambient": 27.546875}

Recommended Posts

Window aggregation of SensorTag with Kafka Streams
Window aggregation of sensor data with Apache Flink and Java 8