Easy Pub/Sub messaging with Apache Kafka

Contents of this document

Let's implement a simple Pub/Sub messaging with Apache Kafka.

アプリケーション構成.png

Build the operating environment for Kafka

I decided to use Docker to build the operating environment for Kafka. The version as of December 31, 2020 is ver 2.6.0.

Create docker-compose.yml

Create docker-compose.yml.

docker-compose.yml


version: '3'

services:

  test-zookeeper:
    image: wurstmeister/zookeeper
    ports:
      - "2181:2181"

  test-kafka:
    image: wurstmeister/kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: localhost
      KAFKA_ZOOKEEPER_CONNECT: test-zookeeper:2181
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock

Start a Docker container

Open the powershell console in the folder that contains docker-compose.yml and run the following command:

powershell


PS> docker-compose.yml up -d
Creating network "kafka_default" with the default driver
Creating kafka_test-zookeeper_1 ... done
Creating kafka_test-kafka_1     ... done

You can check if it started normally with the following command.

powershell


PS> docker-compose.yml ps
         Name                       Command               State                         Ports
--------------------------------------------------------------------------------------------------------------------
kafka_test-kafka_1       start-kafka.sh                   Up      0.0.0.0:9092->9092/tcp
kafka_test-zookeeper_1   /bin/sh -c /usr/sbin/sshd  ...   Up      0.0.0.0:2181->2181/tcp, 22/tcp, 2888/tcp, 3888/tcp

Stop (and delete) the Docker container

Open the powershell console in the folder that contains docker-compose.yml and run the following command:

powershell


PS> docker-compose.yml down

Check the status of Kafka

Check the status of Kafka on the docker desktop dashboard.

Kafkaの起動ログ.png

If it starts normally, you should see the following log.

test-kafka_1 | [2020-12-30 04:19:50,593] INFO [KafkaServer id=1001] started (kafka.server.KafkaServer)

Open the CLI. It can be launched from the icon on the Kafka instance page.

CLIの起動.png

To display the list of registered topics, execute the following command. If the topic does not exist, nothing will be displayed.

/# kafka-topics.sh --zookeeper test-zookeeper:2181 --list

To register the topic here, run the following command:

/# kafka-topics.sh --zookeeper test-zookeeper:2181 --create --topic test-topic --replication-factor 1 --partitions 1

Implement a Pub/Sub application

Implement as a .NET Core 3.1 console application. I used Confluent.Kafka ver 1.5.3 for the Kafka library.

The full source code has been uploaded to GitHub.

Keys and messages

Link the following keys and messages between applications.

/// <summary>
///Key
/// </summary>
public readonly struct SampleMessageKey : IEquatable<SampleMessageKey>
{
    public SampleMessageKey(string key)
    {
        Key = key;
    }

    public string Key { get; }

    public override bool Equals(object obj)
    {
        return obj is SampleMessageKey key && Equals(key);
    }
    public bool Equals(SampleMessageKey other)
    {
        return Key == other.Key;
    }
    public override int GetHashCode()
    {
        return 990326508 + EqualityComparer<string>.Default.GetHashCode(Key);
    }
    public override string ToString()
    {
        return Key;
    }
}

/// <summary>
///message
/// </summary>
public class SampleMessageBody
{
    public SampleMessageBody(DateTimeOffset time, string message)
    {
        Time = time;
        Message = message;
    }

    public DateTimeOffset Time { get; }
    public string Message { get; }

    public override string ToString()
    {
        return Message;
    }
}

Publisher application

A console application that issues messages to Kafka at regular intervals. The bootstrap server and topic to publish to are received from console input.

Entry point implementation

Program.cs


class Program
{
    static async Task Main(string[] args)
    {
        try
        {
            await RunAsync().ConfigureAwait(false);
        }
        catch (Exception ex)
        {
            Console.WriteLine(ex);
        }
        Console.ReadKey();
    }

    /// <summary>
    ///Execute the process.
    /// </summary>
    /// <returns></returns>
    static async Task RunAsync()
    {
        int process = System.Diagnostics.Process.GetCurrentProcess().Id;
        Console.WriteLine($"Process publisher{process}It started with.");

        //Receive parameters from the console
        Console.WriteLine("Enter bootstrap servers (default 127).0.0.1):");
        var bootstrapServers = Console.ReadLine();
        if (string.IsNullOrEmpty(bootstrapServers)) { bootstrapServers = "127.0.0.1"; }

        Console.WriteLine($"Please enter a topic (default){Constants.DefaultTopic}):");
        var topic = Console.ReadLine();
        if (string.IsNullOrEmpty(topic)) { topic = Constants.DefaultTopic; }

        //Generate a cancel token
        using var cancelTokenSource = new CancellationTokenSource();
        Console.CancelKeyPress += (_, e) =>
        {
            e.Cancel = true;
            cancelTokenSource.Cancel();
        };

        //Generate behavior settings
        var publisherSetting = new MessagePublisherSetting()
        {
            BootstrapServers = bootstrapServers
        };

        //Method to generate key
        static SampleMessageKey GenerateKey()
        {
            return new SampleMessageKey(Guid.NewGuid().ToString());
        }

        //Generate a publisher
        var factory = new SampleMessagePublisherFactory(publisherSetting, new SampleLogger());

        using IMessagePublisher<SampleMessageBody> publisher
            = factory.CreatePublisher<SampleMessageKey, SampleMessageBody>(topic, GenerateKey);

        Console.WriteLine("The message sending process will start. Ctrl to exit+Press C.");

        //Issue messages at regular intervals
        int sequence = 0;
        TimeSpan interval = TimeSpan.FromSeconds(5);

        while (true)
        {
            if (cancelTokenSource.Token.IsCancellationRequested) { break; }

            ++sequence;

            await publisher.PublishAsync(
                new SampleMessageBody(DateTimeOffset.UtcNow, $"{sequence}Second message (process){process})")
                , cancelTokenSource.Token
                ).ConfigureAwait(false);

            await Task.Delay(interval, cancelTokenSource.Token);
        }

        Console.WriteLine("The message sending process has ended.");
    }
}

Publisher implementation

We adopted a simple factory pattern.

SampleMessagePublisherFactory.cs


/// <summary>
///Generate a publisher.
/// </summary>
internal class SampleMessagePublisherFactory : MessagePublisherFactoryBase
{
    /// <summary>
    ///Create an instance.
    /// </summary>
    /// <param name="publisherSetting">Publisher behavior settings</param>
    /// <param name="logger">Logger</param>
    internal SampleMessagePublisherFactory(MessagePublisherSetting publisherSetting, ILogger logger)
        : base(publisherSetting, logger)
    {
    }

    /// <summary>
    ///Get the serializer.
    /// </summary>
    /// <typeparam name="T">Type of object to be serialized</typeparam>
    /// <returns>Serializer</returns>
    protected override ISerializer<T> GetSerializer<T>()
    {
        return SampleSerializerFactory.Create<T>();
    }
}

/// <summary>
///Basic implementation of publisher generation process.
/// </summary>
public abstract class MessagePublisherFactoryBase
{
    /// <summary>
    ///Create an instance.
    /// </summary>
    /// <param name="publisherSetting">Publisher behavior settings</param>
    /// <param name="logger">Logger</param>
    protected MessagePublisherFactoryBase(MessagePublisherSetting publisherSetting, ILogger logger)
    {
        PublisherSetting = publisherSetting;
        Logger = logger;
    }

    /// <summary>
    ///Gets the publisher behavior settings.
    /// </summary>
    protected MessagePublisherSetting PublisherSetting { get; }

    /// <summary>
    ///Get the logger.
    /// </summary>
    private ILogger Logger { get; }

    /// <summary>
    ///Generate a publisher.
    /// </summary>
    /// <typeparam name="TKey">Key type</typeparam>
    /// <typeparam name="TMessage">Message type</typeparam>
    /// <param name="topic">topic</param>
    /// <param name="keyGenerator">Key generation process</param>
    /// <returns>Publisher</returns>
    public MessagePublisher<TKey, TMessage> CreatePublisher<TKey, TMessage>(string topic, Func<TKey> keyGenerator)
    {
        return new MessagePublisher<TKey, TMessage>(
            GetSerializer<TKey>()
            , GetSerializer<TMessage>()
            , PublisherSetting
            , topic
            , keyGenerator
            , Logger
            );
    }

    /// <summary>
    ///Get the serializer.
    /// </summary>
    /// <typeparam name="T">Type of object to be serialized</typeparam>
    /// <returns>Serializer</returns>
    protected abstract ISerializer<T> GetSerializer<T>();
}

The publisher includes a producer for Kafka (IProducer <TKey, TMessage>) and sends the specified message to Kafka.

MessagePublisher.cs


/// <summary>
///Send a message to Kafka.
/// </summary>
/// <typeparam name="TKey">Key type</typeparam>
/// <typeparam name="TMessage">Message type</typeparam>
public class MessagePublisher<TKey, TMessage> : IMessagePublisher<TMessage>
{
    /// <summary>
    ///Create an instance.
    /// </summary>
    /// <param name="keySerializer">Serializer for keys</param>
    /// <param name="messageSerializer">Serializer for messages</param>
    /// <param name="setting">Producer behavior settings</param>
    /// <param name="keyGenerator">Key generation process</param>
    /// <param name="topic">topic</param>
    /// <param name="logger">Logger</param>
    public MessagePublisher(ISerializer<TKey> keySerializer, ISerializer<TMessage> messageSerializer, MessagePublisherSetting setting, string topic, Func<TKey> keyGenerator, ILogger logger)
    {
        KeySerializer = keySerializer;
        MessageSerializer = messageSerializer;
        Topic = topic;
        KeyGenerator = keyGenerator;
        Logger = logger;
        Producer = BuildProducer(GetProducerConfig(setting));
    }

    /// <summary>
    ///Release the resources you are using.
    /// </summary>
    public void Dispose()
    {
        Dispose(true);
        GC.SuppressFinalize(this);
    }

    /// <summary>
    ///Release the resources you are using.
    /// </summary>
    protected virtual void Dispose(bool disposing)
    {
        TerminateProducer();
    }

    /// <summary>
    ///Gets the serializer for the key.
    /// </summary>
    private ISerializer<TKey> KeySerializer { get; }

    /// <summary>
    ///Gets the serializer for the message.
    /// </summary>
    private ISerializer<TMessage> MessageSerializer { get; }

    /// <summary>
    ///Get the logger.
    /// </summary>
    private ILogger Logger { get; }

    /// <summary>
    ///Get the topic.
    /// </summary>
    private string Topic { get; }

    /// <summary>
    ///Gets the key generation process.
    /// </summary>
    private Func<TKey> KeyGenerator { get; }

    #region producer

    /// <summary>
    ///Get a producer.
    /// </summary>
    private IProducer<TKey, TMessage> Producer { get; }

    /// <summary>
    ///Release the producer.
    /// </summary>
    private void TerminateProducer()
    {
        if (Producer == null) { return; }
        Producer.Flush(TimeSpan.FromMilliseconds(10000));
        Producer.Dispose();
    }

    /// <summary>
    ///Generate a producer.
    /// </summary>
    /// <param name="config">Operation setting</param>
    /// <returns>producer</returns>
    protected virtual IProducer<TKey, TMessage> BuildProducer(IEnumerable<KeyValuePair<string, string>> config)
    {
        var producerBuilder = new ProducerBuilder<TKey, TMessage>(config)
            .SetKeySerializer(KeySerializer)
            .SetValueSerializer(MessageSerializer)
            .SetErrorHandler(OnError)
            ;

        return producerBuilder.Build();
    }

    /// <summary>
    ///Get the producer's behavior settings.
    /// </summary>
    /// <param name="producerSetting">Producer behavior settings</param>
    /// <returns>Combination of key and value of operation setting</returns>
    protected virtual IEnumerable<KeyValuePair<string, string>> GetProducerConfig(MessagePublisherSetting producerSetting)
    {
        if (producerSetting.BootstrapServers == null || producerSetting.BootstrapServers == "")
        {
            throw new NullReferenceException("The bootstrap server is not configured.");
        }

        return new ProducerConfig()
        {
            BootstrapServers = producerSetting.BootstrapServers,
        };
    }

    /// <summary>
    ///Performs processing when an error occurs.
    /// </summary>
    /// <param name="producer"></param>
    /// <param name="error"></param>
    protected virtual void OnError(IProducer<TKey, TMessage> producer, Error error)
    {
        WriteLog(LogLevel.Error, () => BuildLogMessage(error));
    }

    #endregion

    #region message issuance

    /// <summary>
    ///Publishes the specified message.
    /// </summary>
    /// <param name="message">message</param>
    /// <returns></returns>
    public Task PublishAsync(TMessage message, CancellationToken cancellationToken)
    {
        var kafkaMessage = new Message<TKey, TMessage>()
        {
            Key = GenerateNewKey(),
            Value = message,
            Timestamp = new Timestamp(DateTimeOffset.UtcNow)
        };

        return Producer.ProduceAsync(Topic, kafkaMessage, cancellationToken)
            .ContinueWith(t => OnPublished(t.Result));
    }

    /// <summary>
    ///Performs the processing when the message is issued.
    /// </summary>
    /// <param name="result">Issuance result</param>
    protected virtual void OnPublished(DeliveryResult<TKey, TMessage> result)
    {
        WriteLog(LogLevel.Debug, () => BuildLogMessage(result));
    }

    /// <summary>
    ///Generate a new key.
    /// </summary>
    /// <returns>Key</returns>
    private TKey GenerateNewKey()
    {
        return KeyGenerator();
    }

    #endregion

    #region logging

    /// <summary>
    ///Outputs the specified log.
    /// </summary>
    /// <param name="level">Log level</param>
    /// <param name="messageBuilder">Method to generate log message</param>
    /// <param name="exception">exception</param>
    private void WriteLog(LogLevel level, Func<string> messageBuilder, Exception? exception = null)
    {
        if (!Logger.IsEnabled(level)) { return; }

        if (exception == null)
        {
            Logger.Log(level, messageBuilder());
        }
        else
        {
            Logger.Log(level, exception, messageBuilder());
        }
    }

    private string BuildLogMessage(DeliveryResult<TKey, TMessage> result)
    {
        return $"Issued a message.[{result.Topic}:{result.Offset}] {result.Message.Value}";
    }

    private string BuildLogMessage(Error error)
    {
        return error.Reason;
    }

    #endregion
}

Subscriber application

A console application that subscribes to messages from Kafka. The bootstrap server and topic you subscribe to are received from console input. We have adopted an observable pattern that uses ReactiveExtensions (System.Reactive).

Entry point implementation

Program.cs


class Program
{
    static async Task Main(string[] args)
    {
        try
        {
            await RunAsync().ConfigureAwait(false);
        }
        catch (Exception ex)
        {
            Console.WriteLine(ex);
        }
        Console.ReadKey();
    }

    static async Task RunAsync()
    {
        int process = System.Diagnostics.Process.GetCurrentProcess().Id;
        Console.WriteLine($"Process subscribers{process}It started with.");

        //Receive parameters from the console
        Console.WriteLine("Enter bootstrap servers (default 127).0.0.1):");
        var bootstrapServers = Console.ReadLine();
        if (string.IsNullOrEmpty(bootstrapServers)) { bootstrapServers = "127.0.0.1"; }

        Console.WriteLine($"Enter the Consumer Group ID (default){Constants.DefaultComsumerGroupID}):");
        var groupID = Console.ReadLine();
        if (string.IsNullOrEmpty(groupID)) { groupID = Constants.DefaultComsumerGroupID; }

        Console.WriteLine($"Please enter a topic (default){Constants.DefaultTopic}):");
        var topic = Console.ReadLine();
        if (string.IsNullOrEmpty(topic)) { topic = Constants.DefaultTopic; }

        //Generate a cancel token
        using var cancelTokenSource = new CancellationTokenSource();
        Console.CancelKeyPress += (_, e) =>
        {
            e.Cancel = true;
            cancelTokenSource.Cancel();
        };

        //Generate behavior settings
        var subscriberSetting = new MessageSubscriberSetting()
        {
            BootstrapServers = bootstrapServers,
            ConsumerGroupID = groupID
        };

        //Observe messages with the observable pattern
        var factory = new SampleMessageSubscriberFactory(subscriberSetting, new SampleLogger());

        Console.WriteLine($"Starts receiving messages. Ctrl to exit+Press C.");

        var subscriber = factory.CreateSubscriber<SampleMessageKey, SampleMessageBody>(topic);
        using var releaser = subscriber.Subscribe(new SampleMessageObserver());

        await subscriber.SubscribeAsync(cancelTokenSource.Token).ConfigureAwait(false);

        Console.WriteLine("The message reception process has ended.");
    }
}

Subscriber implementation

We adopted a simple factory pattern.

SampleMessagePublisherFactory.cs


/// <summary>
///Generate a subscriber.
/// </summary>
internal class SampleMessageSubscriberFactory : MessageSubscriberFactoryBase
{
    /// <summary>
    ///Create an instance.
    /// </summary>
    /// <param name="subscriberSetting">Subscriber behavior settings</param>
    /// <param name="logger">Logger</param>
    internal SampleMessageSubscriberFactory(MessageSubscriberSetting subscriberSetting, ILogger logger)
        : base(subscriberSetting, logger)
    {
    }

    /// <summary>
    ///Get the serializer.
    /// </summary>
    /// <typeparam name="T">Type of object to be serialized</typeparam>
    /// <returns>Serializer</returns>
    protected override IDeserializer<T> GetDeserializer<T>()
    {
        return SampleSerializerFactory.Create<T>();
    }
}

/// <summary>
///Basic implementation of subscriber generation process.
/// </summary>
public abstract class MessageSubscriberFactoryBase
{
    /// <summary>
    ///Create an instance.
    /// </summary>
    /// <param name="subscriberSetting">Subscriber behavior settings</param>
    /// <param name="logger">Logger</param>
    protected MessageSubscriberFactoryBase(MessageSubscriberSetting subscriberSetting, ILogger logger)
    {
        SubscriberSetting = subscriberSetting;
        Logger = logger;
    }

    /// <summary>
    ///Gets the subscriber behavior settings.
    /// </summary>
    private MessageSubscriberSetting SubscriberSetting { get; }

    /// <summary>
    ///Get the logger.
    /// </summary>
    private ILogger Logger { get; }

    /// <summary>
    ///Generate a subscriber.
    /// </summary>
    /// <typeparam name="TKey">Key type</typeparam>
    /// <typeparam name="TMessage">Message type</typeparam>
    /// <returns>Subscriber</returns>
    public MessageSubscriber<TKey, TMessage> CreateSubscriber<TKey, TMessage>(string topic)
    {
        return new MessageSubscriber<TKey, TMessage>(
            GetDeserializer<TKey>()
            , GetDeserializer<TMessage>()
            , SubscriberSetting
            , topic
            , Logger
            );
    }

    /// <summary>
    ///Get the serializer.
    /// </summary>
    /// <typeparam name="T">Type of object to be serialized</typeparam>
    /// <returns>Serializer</returns>
    protected abstract IDeserializer<T> GetDeserializer<T>();
}

The subscriber contains a consumer (IConsumer <TKey, TMessage>) for Kafka and receives messages issued for the specified topic from Kafka.

MessageSubscriber.cs


/// <summary>
///Monitor messages from Kafka.
/// </summary>
/// <typeparam name="TKey">Message key type</typeparam>
/// <typeparam name="TMessage">Message type</typeparam>
public class MessageSubscriber<TKey, TMessage> : System.Reactive.ObservableBase<TMessage>
{
    /// <summary>
    ///Create an instance.
    /// </summary>
    /// <param name="keyDeserializer">Deserializer for keys</param>
    /// <param name="messageDeserializer">Deserializer for messages</param>
    /// <param name="subscriberSetting">Operation setting</param>
    /// <param name="topic">topic</param>
    /// <param name="logger">Logger</param>
    public MessageSubscriber(IDeserializer<TKey> keyDeserializer, IDeserializer<TMessage> messageDeserializer, MessageSubscriberSetting subscriberSetting, string topic, ILogger logger) : base()
    {
        KeyDeserializer = keyDeserializer;
        MessageDeserializer = messageDeserializer;
        SubscriberSetting = subscriberSetting;
        Topic = topic;
        Logger = logger ?? Microsoft.Extensions.Logging.Abstractions.NullLogger.Instance;
    }

    /// <summary>
    ///Get the logger.
    /// </summary>
    private ILogger Logger { get; }

    /// <summary>
    ///Get the operation settings.
    /// </summary>
    private MessageSubscriberSetting SubscriberSetting { get; }

    /// <summary>
    ///Get the topic.
    /// </summary>
    private string Topic { get; }

    #region receive

    /// <summary>
    ///Gets the deserializer for the key.
    /// </summary>
    private IDeserializer<TKey> KeyDeserializer { get; }

    /// <summary>
    ///Gets the deserializer for the message.
    /// </summary>
    private IDeserializer<TMessage> MessageDeserializer { get; }

    /// <summary>
    ///Start receiving messages.
    /// </summary>
    /// <param name="cancellation">Cancel token</param>
    public Task SubscribeAsync(CancellationToken cancellation)
    {
        Task.Yield();

        TimeSpan interval = SubscriberSetting.ConsumeInterval;

        using var consumer = BuildConsumer(GetConsumerConfig(SubscriberSetting));
        consumer.Subscribe(Topic);

        while (true)
        {
            if (cancellation.IsCancellationRequested) { break; }
            try
            {
                if (m_Observers.Count == 0) { continue; }

                ConsumeResult<TKey, TMessage> result = consumer.Consume(interval);

                if (result == null) { continue; }
                // TODO:IsPartitionEOF could not be generated by this confirmation.
                if (result.IsPartitionEOF) { continue; }

                WriteLog(LogLevel.Debug, () => BuildLogMessage(result));
                NotifyMessage(result.Message.Value);

                consumer.Commit(result);
            }
            catch (Exception ex)
            {
                OnException(consumer, ex);
                break;
            }
        }

        NotifyComplated();

        return Task.CompletedTask;
    }

    /// <summary>
    ///Generate a consumer.
    /// </summary>
    /// <param name="config">Operation setting</param>
    /// <returns>consumer</returns>
    protected IConsumer<TKey, TMessage> BuildConsumer(IEnumerable<KeyValuePair<string, string>> config)
    {
        var consumerBuilder = new ConsumerBuilder<TKey, TMessage>(config)
            .SetKeyDeserializer(KeyDeserializer)
            .SetValueDeserializer(MessageDeserializer)
            .SetErrorHandler(OnError)
            .SetLogHandler(OnLogging)
            ;

        return consumerBuilder.Build();
    }

    /// <summary>
    ///Gets the consumer behavior settings.
    /// </summary>
    /// <param name="consumerSetting">Consumer behavior settings</param>
    /// <returns>Combination of key and value of operation setting</returns>
    protected IEnumerable<KeyValuePair<string, string>> GetConsumerConfig(MessageSubscriberSetting consumerSetting)
    {
        if (consumerSetting.BootstrapServers == null || consumerSetting.BootstrapServers == "")
        {
            throw new NullReferenceException("The bootstrap server is not configured.");
        }

        if (consumerSetting.ConsumerGroupID == null || consumerSetting.ConsumerGroupID == "")
        {
            throw new NullReferenceException("The consumer group ID has not been set.");
        }

        return new ConsumerConfig()
        {
            BootstrapServers = consumerSetting.BootstrapServers,
            GroupId = consumerSetting.ConsumerGroupID,
            EnableAutoCommit = false,
            AutoOffsetReset = AutoOffsetReset.Earliest
        };
    }

    /// <summary>
    ///Output the log.
    /// </summary>
    /// <param name="consumer"></param>
    /// <param name="log"></param>
    private void OnLogging(IConsumer<TKey, TMessage> consumer, LogMessage log)
    {
        LogLevel logLevel = log.Level.ToLogLevel();
        WriteLog(logLevel, () => BuildLogMessage(log));
    }

    /// <summary>
    ///Performs processing when an error occurs.
    /// </summary>
    /// <param name="consumer"></param>
    /// <param name="error"></param>
    private void OnError(IConsumer<TKey, TMessage> consumer, Error error)
    {
        WriteLog(LogLevel.Error, () => BuildLogMessage(error));
        NotifyError(new Exception(error.Reason));
    }

    /// <summary>
    ///Performs processing when an exception occurs.
    /// </summary>
    /// <param name="consumer"></param>
    /// <param name="exception"></param>
    private void OnException(IConsumer<TKey, TMessage> consumer, Exception exception)
    {
        WriteLog(LogLevel.Critical, () => BuildLogMessage(exception), exception);
        NotifyError(exception);
    }

    #endregion

    #region notification

    /// <summary>
    ///Notifies the specified message.
    /// </summary>
    /// <param name="message">message</param>
    private void NotifyMessage(TMessage message)
    {
        if (m_Observers.Count == 0) { return; }
        lock (m_Observers)
        {
            for (int i = 0; i < m_Observers.Count; ++i)
            {
                m_Observers[i].OnNext(message);
            }
        }
    }

    /// <summary>
    ///Notifies the specified message.
    /// </summary>
    /// <param name="exception">exception</param>
    private void NotifyError(Exception exception)
    {
        if (m_Observers.Count == 0) { return; }
        lock (m_Observers)
        {
            for (int i = 0; i < m_Observers.Count; ++i)
            {
                m_Observers[i].OnError(exception);
            }
        }
    }

    /// <summary>
    ///Notify the completion.
    /// </summary>
    private void NotifyComplated()
    {
        if (m_Observers.Count == 0) { return; }
        lock (m_Observers)
        {
            for (int i = 0; i < m_Observers.Count; ++i)
            {
                m_Observers[i].OnCompleted();
            }
        }
    }

    #endregion

    #region observer

    /// <summary>
    ///The subscription by the specified observer will start.
    /// </summary>
    /// <param name="observer"></param>
    /// <returns></returns>
    protected override IDisposable SubscribeCore(IObserver<TMessage> observer)
    {
        AddObserver(observer);
        return System.Reactive.Disposables.Disposable.Create(() => RemoveObserver(observer));
    }

    private readonly List<IObserver<TMessage>> m_Observers = new List<IObserver<TMessage>>();

    /// <summary>
    ///Adds the specified observer.
    /// </summary>
    /// <param name="observer"></param>
    private void AddObserver(IObserver<TMessage> observer)
    {
        lock (m_Observers)
        {
            m_Observers.Add(observer);
        }
    }

    /// <summary>
    ///Deletes the specified observer.
    /// </summary>
    /// <param name="observer"></param>
    private void RemoveObserver(IObserver<TMessage> observer)
    {
        if (m_Observers.Contains(observer))
        {
            lock (m_Observers)
            {
                m_Observers.Remove(observer);
            }
        }
    }

    #endregion

    #region logging

    /// <summary>
    ///Outputs the specified log.
    /// </summary>
    /// <param name="level">Log level</param>
    /// <param name="messageBuilder">Method to generate log message</param>
    /// <param name="exception">exception</param>
    private void WriteLog(LogLevel level, Func<string> messageBuilder, Exception? exception = null)
    {
        if (!Logger.IsEnabled(level)) { return; }

        if (exception == null)
        {
            Logger.Log(level, messageBuilder());
        }
        else
        {
            Logger.Log(level, exception, messageBuilder());
        }
    }

    private string BuildLogMessage(ConsumeResult<TKey, TMessage> result)
    {
        return $"I received a message.[{result.Topic}:{result.Offset}] {result.Message.Value}";
    }

    private string BuildLogMessage(LogMessage log)
    {
        return log.Message;
    }

    private string BuildLogMessage(Error error)
    {
        return error.Reason;
    }

    private string BuildLogMessage(Exception exception)
    {
        return exception.Message;
    }

    #endregion
}

Checking the operation of the application

Send / receive control by consumer group and topic

Launch multiple publishers and subscribers to see how messages are sent and received.

Publisher motion
Publisher1 "Topic-A"Issue a message to.
Publisher2 "Topic-B"Issue a message to.
Publisher3 "Topic-B"Issue a message to.
Subscriber motion
SubScriber1 Consumer group"Group1"In the consumer belonging to"Topic-A"Subscribe to the message.
SubScriber2 Consumer group"Group1"In the consumer belonging to"Topic-A"Subscribe to the message.
SubScriber3 Consumer group"Group2"In the consumer belonging to"Topic-A"Subscribe to the message.
SubScriber4 Consumer group"Group1"In the consumer belonging to"Topic-B"Subscribe to the message.

The "[Topic-A: 0]" output in the log represents the topic and the offset value of that topic (a serial number that is incremented each time a message is issued).

Publisher1


You launched the publisher in process 5040.
Enter bootstrap servers (default 127).0.0.1):
Enter a topic (default test-topic):
Topic-A
The message sending process will start. Ctrl to exit+Press C.
11:33:13.426 [Debug]Issued a message.[Topic-A:0]First message (process 5040)
11:33:18.453 [Debug]Issued a message.[Topic-A:1]Second message (process 5040)
11:33:23.491 [Debug]Issued a message.[Topic-A:2]Third message (process 5040)
11:33:28.508 [Debug]Issued a message.[Topic-A:3]4th message (process 5040)
11:33:33.530 [Debug]Issued a message.[Topic-A:4]5th message (process 5040)
11:33:38.546 [Debug]Issued a message.[Topic-A:5]6th message (process 5040)
11:33:43.565 [Debug]Issued a message.[Topic-A:6]7th message (process 5040)
11:33:48.595 [Debug]Issued a message.[Topic-A:7]8th message (process 5040)
(Omitted below)

Two processes are issuing messages to Tobic-B. You can see that the offset values ​​are incremented without duplication.

Publisher2


You launched the publisher in process 15532.
Enter bootstrap servers (default 127).0.0.1):
Enter a topic (default test-topic):
Topic-B
The message sending process will start. Ctrl to exit+Press C.
11:33:21.558 [Debug]Issued a message.[Topic-B:0]First message (process 15532)
11:33:26.592 [Debug]Issued a message.[Topic-B:1]Second message (process 15532)
11:33:31.621 [Debug]Issued a message.[Topic-B:2]Third message (process 15532)
11:33:36.640 [Debug]Issued a message.[Topic-B:4]Fourth message (process 15532)
11:33:41.653 [Debug]Issued a message.[Topic-B:6]5th message (process 15532)
11:33:46.672 [Debug]Issued a message.[Topic-B:8]6th message (process 15532)
11:33:51.687 [Debug]Issued a message.[Topic-B:10]7th message (process 15532)
11:33:56.714 [Debug]Issued a message.[Topic-B:12]8th message (process 15532)
(Omitted below)

Piblisher3


Launched the publisher in process 4928.
Enter bootstrap servers (default 127).0.0.1):
Enter a topic (default test-topic):
Topic-B
The message sending process will start. Ctrl to exit+Press C.
11:33:33.667 [Debug]Issued a message.[Topic-B:3]First message (process 4928)
11:33:38.698 [Debug]Issued a message.[Topic-B:5]Second message (process 4928)
11:33:43.718 [Debug]Issued a message.[Topic-B:7]Third message (process 4928)
11:33:48.738 [Debug]Issued a message.[Topic-B:9]Fourth message (process 4928)
11:33:53.764 [Debug]Issued a message.[Topic-B:11]5th message (process 4928)
11:33:58.790 [Debug]Issued a message.[Topic-B:13]6th message (process 4928)
11:34:03.800 [Debug]Issued a message.[Topic-B:15]7th message (process 4928)
11:34:08.809 [Debug]Issued a message.[Topic-B:17]8th message (process 4928)
(Omitted below)

If you start two processes that subscribe to Topic-A messages in consumer group Group1, only one of them will receive the message. If you stop the subscriber who was receiving the message, the other subscriber will receive the message.

Subscriber1


You started the subscriber in process 11948.
Enter bootstrap servers (default 127).0.0.1):
Enter the Consumer Group ID (default test-group):
Group1
Enter a topic (default test-topic):
Topic-A
Starts receiving messages. Ctrl to exit+Press C.
11:33:13.493 [Debug]I received a message.[Topic-A:0]First message (process 5040)
11:33:18.450 [Debug]I received a message.[Topic-A:1]Second message (process 5040)
11:33:23.487 [Debug]I received a message.[Topic-A:2]Third message (process 5040)
11:33:28.510 [Debug]I received a message.[Topic-A:3]4th message (process 5040)
11:33:33.531 [Debug]I received a message.[Topic-A:4]5th message (process 5040)
11:33:38.547 [Debug]I received a message.[Topic-A:5]6th message (process 5040)
11:33:43.566 [Debug]I received a message.[Topic-A:6]7th message (process 5040)
11:33:48.595 [Debug]I received a message.[Topic-A:7]8th message (process 5040)
11:33:53.615 [Debug]I received a message.[Topic-A:8]9th message (process 5040)
11:33:58.633 [Debug]I received a message.[Topic-A:9]10th message (process 5040)
11:34:03.660 [Debug]I received a message.[Topic-A:10]11th message (process 5040)
11:34:08.685 [Debug]I received a message.[Topic-A:11]12th message (process 5040)
11:34:13.701 [Debug]I received a message.[Topic-A:12]13th message (process 5040)
11:34:18.719 [Debug]I received a message.[Topic-A:13]14th message (process 5040)
11:34:23.743 [Debug]I received a message.[Topic-A:14]15th message (process 5040)
11:34:28.765 [Debug]I received a message.[Topic-A:15]16th message (process 5040)
The message reception process has ended.

Subscriber2


You started the subscriber in process 13144.
Enter bootstrap servers (default 127).0.0.1):
Enter the Consumer Group ID (default test-group):
Group1
Enter a topic (default test-topic):
Topic-A
Starts receiving messages. Ctrl to exit+Press C.
11:34:42.843 [Debug]I received a message.[Topic-A:16]17th message (process 5040)
11:34:42.854 [Debug]I received a message.[Topic-A:17]18th message (process 5040)
11:34:43.810 [Debug]I received a message.[Topic-A:18]19th message (process 5040)
11:34:48.821 [Debug]I received a message.[Topic-A:19]20th message (process 5040)

You will receive all messages for different consumer groups.

Subscriber3


You started the subscriber in process 11192.
Enter bootstrap servers (default 127).0.0.1):
Enter the Consumer Group ID (default test-group):
Group2
Enter a topic (default test-topic):
Topic-A
Starts receiving messages. Ctrl to exit+Press C.
11:33:13.493 [Debug]I received a message.[Topic-A:0]First message (process 5040)
11:33:18.451 [Debug]I received a message.[Topic-A:1]Second message (process 5040)
11:33:23.484 [Debug]I received a message.[Topic-A:2]Third message (process 5040)
11:33:28.511 [Debug]I received a message.[Topic-A:3]4th message (process 5040)
11:33:33.530 [Debug]I received a message.[Topic-A:4]5th message (process 5040)
11:33:38.547 [Debug]I received a message.[Topic-A:5]6th message (process 5040)
11:33:43.565 [Debug]I received a message.[Topic-A:6]7th message (process 5040)
11:33:48.594 [Debug]I received a message.[Topic-A:7]8th message (process 5040)
11:33:53.616 [Debug]I received a message.[Topic-A:8]9th message (process 5040)
11:33:58.633 [Debug]I received a message.[Topic-A:9]10th message (process 5040)
11:34:03.661 [Debug]I received a message.[Topic-A:10]11th message (process 5040)
11:34:08.684 [Debug]I received a message.[Topic-A:11]12th message (process 5040)
11:34:13.702 [Debug]I received a message.[Topic-A:12]13th message (process 5040)
11:34:18.717 [Debug]I received a message.[Topic-A:13]14th message (process 5040)
11:34:23.743 [Debug]I received a message.[Topic-A:14]15th message (process 5040)
11:34:28.765 [Debug]I received a message.[Topic-A:15]16th message (process 5040)
11:34:33.786 [Debug]I received a message.[Topic-A:16]17th message (process 5040)
11:34:38.797 [Debug]I received a message.[Topic-A:17]18th message (process 5040)
11:34:43.809 [Debug]I received a message.[Topic-A:18]19th message (process 5040)
11:34:48.820 [Debug]I received a message.[Topic-A:19]20th message (process 5040)

Messages are issued to Topic-B from two processes. You can receive them in the order they were issued.

Subscriber4


You started the subscriber in process 13912.
Enter bootstrap servers (default 127).0.0.1):
Enter the Consumer Group ID (default test-group):
Group1
Enter a topic (default test-topic):
Topic-B
Starts receiving messages. Ctrl to exit+Press C.
11:33:21.625 [Debug]I received a message.[Topic-B:0]First message (process 15532)
11:33:26.589 [Debug]I received a message.[Topic-B:1]Second message (process 15532)
11:33:31.623 [Debug]I received a message.[Topic-B:2]Third message (process 15532)
11:33:33.650 [Debug]I received a message.[Topic-B:3]First message (process 4928)
11:33:36.639 [Debug]I received a message.[Topic-B:4]Fourth message (process 15532)
11:33:38.698 [Debug]I received a message.[Topic-B:5]Second message (process 4928)
11:33:41.655 [Debug]I received a message.[Topic-B:6]5th message (process 15532)
11:33:43.718 [Debug]I received a message.[Topic-B:7]Third message (process 4928)
11:33:46.675 [Debug]I received a message.[Topic-B:8]6th message (process 15532)
11:33:48.735 [Debug]I received a message.[Topic-B:9]Fourth message (process 4928)
11:33:51.689 [Debug]I received a message.[Topic-B:10]7th message (process 15532)
(Omitted below)

Guarantee of message arrival

Make sure that you can receive messages issued while the subscriber does not exist.

Publisher motion
Publisher1 "Topic-A"Issue a message to.
Subscriber motion
SubScriber1 It starts after some messages have been issued by Publisher1. After receiving the message, close it.
SubScriber2 Starts after SubScriber1 exits and Publisher1 issues some messages.

The Topic-A offset value starts at 20 because we have been running an instance of Kafuka since the above validation.

Publisher1


You launched the publisher in process 5096.
Enter bootstrap servers (default 127).0.0.1):
Enter a topic (default test-topic):
Topic-A
The message sending process will start. Ctrl to exit+Press C.
12:21:50.114 [Debug]Issued a message.[Topic-A:20]First message (process 5096)
12:21:55.149 [Debug]Issued a message.[Topic-A:21]Second message (process 5096)
12:22:00.164 [Debug]Issued a message.[Topic-A:22]Third message (process 5096)
12:22:05.183 [Debug]Issued a message.[Topic-A:23]4th message (process 5096)
12:22:10.201 [Debug]Issued a message.[Topic-A:24]5th message (process 5096)
12:22:15.215 [Debug]Issued a message.[Topic-A:25]6th message (process 5096)
12:22:20.239 [Debug]Issued a message.[Topic-A:26]7th message (process 5096)
12:22:25.255 [Debug]Issued a message.[Topic-A:27]8th message (process 5096)
12:22:30.280 [Debug]Issued a message.[Topic-A:28]9th message (process 5096)
12:22:35.295 [Debug]Issued a message.[Topic-A:29]10th message (process 5096)
12:22:40.311 [Debug]Issued a message.[Topic-A:30]11th message (process 5096)
12:22:45.327 [Debug]Issued a message.[Topic-A:31]12th message (process 5096)
12:22:50.344 [Debug]Issued a message.[Topic-A:32]13th message (process 5096)
12:22:55.360 [Debug]Issued a message.[Topic-A:33]14th message (process 5096)

Immediately after the subscriber starts, the unsubscribed messages (20, 21) held in Kafka are received together.

SubScriber1


You started the subscriber in process 17640.
Enter bootstrap servers (default 127).0.0.1):
Enter the Consumer Group ID (default test-group):
Group1
Enter a topic (default test-topic):
Topic-A
Starts receiving messages. Ctrl to exit+Press C.
12:21:58.386 [Debug]I received a message.[Topic-A:20]First message (process 5096)
12:21:58.397 [Debug]I received a message.[Topic-A:21]Second message (process 5096)
12:22:00.165 [Debug]I received a message.[Topic-A:22]Third message (process 5096)
12:22:05.182 [Debug]I received a message.[Topic-A:23]4th message (process 5096)
12:22:10.202 [Debug]I received a message.[Topic-A:24]5th message (process 5096)
The message reception process has ended.

Immediately after the subscriber starts, the unsubscribed messages (25, 26, 27, 28) held in Kafka are received together.

SubScriber2


Enter bootstrap servers (default 127).0.0.1):
Enter the Consumer Group ID (default test-group):
Group1
Enter a topic (default test-topic):
Topic-A
Starts receiving messages. Ctrl to exit+Press C.
12:22:32.537 [Debug]I received a message.[Topic-A:25]6th message (process 5096)
12:22:32.548 [Debug]I received a message.[Topic-A:26]7th message (process 5096)
12:22:32.550 [Debug]I received a message.[Topic-A:27]8th message (process 5096)
12:22:32.551 [Debug]I received a message.[Topic-A:28]9th message (process 5096)
12:22:35.298 [Debug]I received a message.[Topic-A:29]10th message (process 5096)
12:22:40.314 [Debug]I received a message.[Topic-A:30]11th message (process 5096)
12:22:58.543 [Debug]I received a message.[Topic-A:31]12th message (process 5096)
12:22:58.551 [Debug]I received a message.[Topic-A:32]13th message (process 5096)
12:22:58.560 [Debug]I received a message.[Topic-A:33]14th message (process 5096)
The message reception process has ended.

Summary

I was able to easily implement Pub/Sub. There are some functions that are not enough to use in the product, but I am thinking of using it to verify the operation of partitions and replicas.

Recommended Posts

Easy Pub/Sub messaging with Apache Kafka
Access Apache Kafka with Micronaut
Until you try running Apache Kafka with docker image
Message cooperation started with Spring Boot Apache Kafka edition
Easy microservices with Spark Framework!
Start Apache Solr with Embedded.
CSV output with Apache Commons CSV
Easy web scraping with Jsoup
Easy library introduction with Maven!
Manipulate Excel with Apache POI