copy consumer offset to another with kafka-python

When implementing Kafka consumer, I often use languages such as Java and Scala, but I would like to use Python, which can be implemented lightly if it is not so complicated. This time I would like to write a maintenance process to move the offset of one consumer to another consumer using kafka-python of Apache Kafka library for Python.

Get consumer offset

This time we will use KafkaAdminClient. You can get it from KafkaConsumer, but it is complicated to subscribe to and poll the consumer, so it is recommended to use this client.

from kafka import KafkaAdminClient

#Assuming consumer01 is subscribing to topic01
target_consumer_name = "consumer01"

#Get KafkaAdminClient
cluster_admin = KafkaAdminClient(bootstrap_servers=bootstrap_servers)

#Get offset
offsets = cluster_admin.list_consumer_group_offsets(target_consumer_name)

For reference, something like this will be returned. Offset information with the TopicPartition class as the key.

{TopicPartition(topic='topic01', partition=0): OffsetAndMetadata(offset=14475, metadata='aa6c00e6-ffbf-41a3-b011-6997549f6166a'),
 TopicPartition(topic='topic01', partition=1): OffsetAndMetadata(offset=14494, metadata='8fcc736c-1cb0-41b5-b111-6d55d67b3096a')}

Write offset to another consumer

Use Kafka Consumer to rewrite the consumer. Write the offset information obtained earlier to consumer02. If consumer does not exist, it will be created automatically.

from kafka import KafkaConsumer

consumer_group_name = 'consumer02'

consumer = KafkaConsumer(
        group_id=consumer_group_name,
        bootstrap_servers=bootstrap_servers,
        enable_auto_commit=False)

#Write offset information (created if consumer does not exist)
consumer.commit(offsets)

The offset information of the same topic as consumer01 is now written to consumer02. When you start a subscription using consumer02, you should start the subscription from the same offset as consumer01 when you got the offset.

Subscription example

consumer_group_name='consumer02'

#Get consumer
consumer = KafkaConsumer(
        group_id=consumer_group_name,
        bootstrap_servers=bootstrap_servers)

#Change to subscribe topic01
consumer.subscribe(topics=['topic01'])

#Actually participated in topic01 consumer02
consumer.poll()

#Continue to print received messages
for msg in consumer:
    print(msg)

reference

https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html https://kafka-python.readthedocs.io/en/master/apidoc/KafkaAdminClient.html

Recommended Posts

copy consumer offset to another with kafka-python
I really wanted to copy with selenium
Carry over data to another page (tab) with streamlit
Convert 202003 to 2020-03 with pandas
I want to specify another version of Python with pyvenv
Challenge problem 5 with Python: lambda ... I decided to copy without
"Deep copy" and "Shallow copy" to understand with the smallest example
Get message from first offset with kafka consumer in python