Bei der Implementierung von Kafka Consumer verwende ich häufig Sprachen wie Java und Scala, aber ich möchte Python verwenden, das leicht implementiert werden kann, wenn es nicht so kompliziert ist. Dieses Mal möchte ich einen Wartungsprozess schreiben, der den Offset eines Verbrauchers mit kafka-python der Apache Kafka-Bibliothek für Python auf einen anderen Verbraucher verschiebt.
Dieses Mal werden wir KafkaAdminClient verwenden. Sie können es von KafkaConsumer erhalten, aber es ist kompliziert, den Verbraucher zu abonnieren und abzufragen. Daher wird empfohlen, diesen Client zu verwenden.
from kafka import KafkaAdminClient
#Angenommen, consumer01 abonniert topic01
target_consumer_name = "consumer01"
#Holen Sie sich KafkaAdminClient
cluster_admin = KafkaAdminClient(bootstrap_servers=bootstrap_servers)
#Versatz abrufen
offsets = cluster_admin.list_consumer_group_offsets(target_consumer_name)
Als Referenz wird so etwas zurückgegeben. Versatzinformationen mit der TopicPartition-Klasse als Schlüssel.
{TopicPartition(topic='topic01', partition=0): OffsetAndMetadata(offset=14475, metadata='aa6c00e6-ffbf-41a3-b011-6997549f6166a'),
TopicPartition(topic='topic01', partition=1): OffsetAndMetadata(offset=14494, metadata='8fcc736c-1cb0-41b5-b111-6d55d67b3096a')}
Verwenden Sie Kafka Consumer, um den Verbraucher neu zu schreiben. Schreiben Sie die zuvor erhaltenen Offset-Informationen an consumer02. Wenn der Verbraucher nicht vorhanden ist, wird er automatisch erstellt.
from kafka import KafkaConsumer
consumer_group_name = 'consumer02'
consumer = KafkaConsumer(
group_id=consumer_group_name,
bootstrap_servers=bootstrap_servers,
enable_auto_commit=False)
#Versatzinformationen schreiben (erstellt, wenn kein Verbraucher vorhanden ist)
consumer.commit(offsets)
Die Offset-Informationen desselben Themas wie consumer01 werden jetzt in consumer02 geschrieben. Wenn Sie Ihr Abonnement mit consumer02 starten, sollten Sie Ihr Abonnement mit demselben Offset wie consumer01 starten, wenn Sie den Offset erhalten haben.
consumer_group_name='consumer02'
#Holen Sie sich Verbraucher
consumer = KafkaConsumer(
group_id=consumer_group_name,
bootstrap_servers=bootstrap_servers)
#Wechseln Sie zum Abonnieren von topic01
consumer.subscribe(topics=['topic01'])
#Hat tatsächlich an topic01 consumer02 teilgenommen
consumer.poll()
#Empfangene Nachrichten weiter drucken
for msg in consumer:
print(msg)
https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html https://kafka-python.readthedocs.io/en/master/apidoc/KafkaAdminClient.html
Recommended Posts