Access Apache Kafka with Micronaut

Micronaut has Apache Kafka support, so I decided to give it a try.

Kafka Support

It seems that you can use Micronaut to create Apache Kafka Producer and Consumer.

The version of Micronaut used this time.

$ mn -V
| Micronaut Version: 1.0.4
| JVM Version: 1.8.0_191

Micronaut 1.0.4 seems to support Apache Kafka 2.0.1.

Upgrade to Kafka 2.0.1

Along with this, I downloaded Apache Kafka 2.0.1.

$ wget
$ tar xf kafka_2.12-2.0.1.tgz
$ cd kafka_2.12-2.0.1

Follow Apache Kafka's Quick Start to start Apache ZooKeeper and Apache Kafka Broker.

Quick Start / Start the server

## Apache Zookeeper
$ bin/ config/

## Apache Kafka(Broker)
$ bin/ config/

Topic was created with the name my-topic.

$ bin/ --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic my-topic
Created topic "my-topic".

Make an application

Now let's create an application. Create a project for Producer and Consumer (here, Listener).

## Producer
$ mn create-app --features kafka --build maven hello-kafka-producer

## Listener(Consumer)
$ mn create-app --features kafka --build maven hello-kafka-listener

After that, we will add and edit the source code for these two projects.

Make a Producer

Now let's write a Producer that sends a message to the Apache Kafka Broker.

$ cd hello-kafka-producer

Use the main class as it is automatically generated.


package hello.kafka.producer;

import io.micronaut.runtime.Micronaut;

public class Application {

    public static void main(String[] args) {;

Producer seems to create an interface with the @KafkaClient annotation.

Defining @KafkaClient Methods

Created like this. There is no need to create an implementation class for the interface.


package hello.kafka.producer;

import io.micronaut.configuration.kafka.annotation.KafkaClient;
import io.micronaut.configuration.kafka.annotation.KafkaKey;
import io.micronaut.configuration.kafka.annotation.Topic;
import io.reactivex.Single;

@KafkaClient(acks = KafkaClient.Acknowledge.ALL)
public interface MessageClient {
    Single<String> send(@KafkaKey String key, Single<String> message);

Settings are made with the @KafkaClient annotation and the configuration file.

In the @KafkaClient annotation, only the ACK was set.

@KafkaClient(acks = KafkaClient.Acknowledge.ALL)
public interface MessageClient {

Configuration file, here.


        name: hello-kafka-producer

        servers: localhost:9092
            retries: 5

kafka.bootstrap.servers configures the connection settings for Apache Kafka Broker.

For the behavior of Producer, use kafka.producers. [Client-id].

By specifying ʻid, it is possible to set in units of @ KafkaClient`.

Per @KafkaClient Producer Properties

This time, I didn't specify ʻid, so it's named default`.

Send a message using the method that specifies the topic name in the @ Topic annotation.

    Single<String> send(@KafkaKey String key, Single<String> message);

If you want to specify a key, use @KafkaKey. If you do not use a key, you do not need to specify it (the key argument itself can be omitted).

You can also use types around Reactive Streams such as RxJava, so I used this one this time.

Reactive and Non-Blocking Method Definitions

Finally, @Controller using @ KafkaClient.


package hello.kafka.producer;

import javax.inject.Inject;

import io.micronaut.http.MediaType;
import io.micronaut.http.annotation.Body;
import io.micronaut.http.annotation.Controller;
import io.micronaut.http.annotation.Post;
import io.reactivex.Single;

public class MessageController {
    MessageClient messageClient;

    @Post(value = "/{key}", consumes = MediaType.TEXT_PLAIN, produces = MediaType.TEXT_PLAIN)
    public Single<String> message(String key, @Body Single<String> value) {
        return messageClient.send(key, value).map(v -> String.format("message [%s] sended", v));


$ ./mvnw package

This completes the Producer side.


Next, the Consumer side.

$ cd hello-kafka-listener

Kafka Consumers Using @KafkaListener

Consumers are created using the @KafkaListener annotation. This is the class definition.


package hello.kafka.listener;

import java.util.List;

import io.micronaut.configuration.kafka.Acknowledgement;
import io.micronaut.configuration.kafka.annotation.KafkaKey;
import io.micronaut.configuration.kafka.annotation.KafkaListener;
import io.micronaut.configuration.kafka.annotation.OffsetReset;
import io.micronaut.configuration.kafka.annotation.OffsetStrategy;
import io.micronaut.configuration.kafka.annotation.Topic;
import io.reactivex.Single;

@KafkaListener(groupId = "message-group", offsetReset = OffsetReset.EARLIEST, offsetStrategy = OffsetStrategy.DISABLED)
public class MessageListener {
    public void receive(@KafkaKey String key, Single<String> message, Acknowledgement acknowledgement) {
        message.subscribe(m -> {
            System.out.printf("Received key / message = %s / %s%n", key, m);

Set the Consumer with the @KafkaListener annotation and specify the @ Topic annotation for the method that receives the message.

@KafkaListener(groupId = "message-group", offsetReset = OffsetReset.EARLIEST, offsetStrategy = OffsetStrategy.DISABLED)
public class MessageListener {
    public void receive(@KafkaKey String key, Single<String> message, Acknowledgement acknowledgement) {

The type to use is Reactive.

Receiving and returning Reactive Types

I also try to use ACK.

        message.subscribe(m -> {
            System.out.printf("Received key / message = %s / %s%n", key, m);

This time, the setting is only the connection destination to Broker. Also, I will start Producer on the same host, so set micronaut.server.port to something other than 8080.


        name: hello-kafka-listener
        port: 9080

        servers: localhost:9092

This is also a build.

$ ./mvnw package

The Consumer side is now ready.

Try to move

Let's check the operation.

##Launch Producer
$ java -jar target/hello-kafka-producer-0.1.jar

##Start Consumer
$ java -jar target/hello-kafka-listener-0.1.jar

I will try to throw a message into Producer as appropriate.

$ curl -H 'Content-Type: text/plain' localhost:8080/message/key1 -d 'value1'
message [value1] sended

$ curl -H 'Content-Type: text/plain' localhost:8080/message/key2 -d 'value2'
message [value2] sended

$ curl -H 'Content-Type: text/plain' localhost:8080/message/key3 -d 'value3'
message [value3] sended

On the Consumer side, the received message is displayed like this.

Received key / message = key1 / value1
Received key / message = key2 / value2
Received key / message = key3 / value3

It seems that you could move it.

