Study Flilnk with Kafka Exercise Code

Environment OS: Ubuntu 16.04 Kafka: kafka_2.10-0.10.2.0 Elasticsearch: elasticsearch-2.4.3 Kibana: kibana-4.6.4-linux-x86_64 Flink: flink-1.3.1-bin-hadoop27-scala_2.10 Java: openjdk version "1.8.0_131" Build Tool: Apache Maven 3.5.0 IDE: IntelliJ

Add dependencies to pom.xml to use Kafka

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-kafka-0.10_2.10</artifactId>
  <version>1.2.0</version>
</dependency>

Write data to Kafka To write data to Kafka, a program needs to write DataStream into "Kafka topic". Instruction: http://training.data-artisans.com/exercises/toFromKafka.html The souce code is here.

  1. Pass taxi ride data as input and filter out data outside of NYC
    val rides = env.addSource(new TaxiRideSource(input, maxDelay, speed))
    val filteredRides = rides.filter(r => GeoUtils.isInNYC(r.startLon, r.startLat) && GeoUtils.isInNYC(r.endLon, r.endLat))
  1. To write DataStream "TaxiRide" to a Kafka topic, use Flinkā€™s Kafka Connector which provides the FlinkKafkaProducer010 class. Here, LOCAL_KAFKA_BROKER is "host_ip:port" of Kafka broker. CLENSED_RIDES_TOPIC is the name of the topic write Datastream to. When running a program, the results are appended to the Kafka topic. Also, Kafka topics are designed as durable logs, as a result, when restarting the program, Kafka topic is not overwritten but all records are appended.
    filteredRides.addSink(
      new FlinkKafkaProducer010[TaxiRide](
        LOCAL_KAFKA_BROKER,
        CLEANSED_RIDES_TOPIC,
        new TaxiRideSchema))

Memo: What is a serializer? According to the instruction, "TaxiRideSchema" in the above code is a "Serializer". What is a Serializer?

https://stackoverflow.com/questions/633402/what-is-serialization Serialization is the process of turning an object in memory into a stream of bytes so you can do stuff like store it on disk or send it over the network. Deserialization is the reverse process: turning a stream of bytes into an object in memory.

Read data from Kafka After the Kafka topic was filled with Datastream, the program needs to read input from the topic by using a KafkaConsumer data source. Instruction: http://training.data-artisans.com/exercises/toFromKafka.html The souce code is here.

  1. Configure and create a Kafka consumer
    // configure Kafka consumer
    val kafkaProps = new Properties
    kafkaProps.setProperty("zookeeper.connect", LOCAL_ZOOKEEPER_HOST)
    kafkaProps.setProperty("bootstrap.servers", LOCAL_KAFKA_BROKER)
    kafkaProps.setProperty("group.id", RIDE_SPEED_GROUP)
    // always read the Kafka topic from the start
    kafkaProps.setProperty("auto.offset.reset", "earliest")

    // create a Kafka consumer
    val consumer = new FlinkKafkaConsumer010[TaxiRide](
        RideCleansingToKafka.CLEANSED_RIDES_TOPIC,
        new TaxiRideSchema,
        kafkaProps)    

Memo: What is "FlinkKafkaConsumer010"? According to the API source code: https://ci.apache.org/projects/flink/flink-docs-release-1.3/api/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.html, "RideCleansingToKafka.CLEANSED_RIDES_TOPIC" is the name of the topic that should be consumed. "TaxiRideSchema" is a serializer which is used at "ilteredRides.addSink" to write data to Kafka as well. "kafkaProps" is the properties used to configure the Kafka consumer client and the ZooKeeper client.

  1. Pass the new conumser as a source data which is used to identify popular places.
val rides = env.addSource(consumer)

Recommended Posts

Study Flilnk with Kafka Exercise Code
Study Flilnk with Elasticsearch Exercise Code
Lombok with VS Code
Check compliance with object-oriented exercise
Docker management with VS Code
Format Ruby with VS Code
Hello World with VS Code!
Access Apache Kafka with Micronaut
Java study memo 2 with Progate
Reduce verbose code with Lombok
Study Java with Progate Note 1