Kopieren Sie den Consumer-Offset mit kafka-python auf einen anderen

Bei der Implementierung von Kafka Consumer verwende ich häufig Sprachen wie Java und Scala, aber ich möchte Python verwenden, das leicht implementiert werden kann, wenn es nicht so kompliziert ist. Dieses Mal möchte ich einen Wartungsprozess schreiben, der den Offset eines Verbrauchers mit kafka-python der Apache Kafka-Bibliothek für Python auf einen anderen Verbraucher verschiebt.

Holen Sie sich Verbraucher Offset

Dieses Mal werden wir KafkaAdminClient verwenden. Sie können es von KafkaConsumer erhalten, aber es ist kompliziert, den Verbraucher zu abonnieren und abzufragen. Daher wird empfohlen, diesen Client zu verwenden.

from kafka import KafkaAdminClient

#Angenommen, consumer01 abonniert topic01
target_consumer_name = "consumer01"

#Holen Sie sich KafkaAdminClient
cluster_admin = KafkaAdminClient(bootstrap_servers=bootstrap_servers)

#Versatz abrufen
offsets = cluster_admin.list_consumer_group_offsets(target_consumer_name)

Als Referenz wird so etwas zurückgegeben. Versatzinformationen mit der TopicPartition-Klasse als Schlüssel.

{TopicPartition(topic='topic01', partition=0): OffsetAndMetadata(offset=14475, metadata='aa6c00e6-ffbf-41a3-b011-6997549f6166a'),
 TopicPartition(topic='topic01', partition=1): OffsetAndMetadata(offset=14494, metadata='8fcc736c-1cb0-41b5-b111-6d55d67b3096a')}

Schreiben Sie den Offset an einen anderen Verbraucher

Verwenden Sie Kafka Consumer, um den Verbraucher neu zu schreiben. Schreiben Sie die zuvor erhaltenen Offset-Informationen an consumer02. Wenn der Verbraucher nicht vorhanden ist, wird er automatisch erstellt.

from kafka import KafkaConsumer

consumer_group_name = 'consumer02'

consumer = KafkaConsumer(
        group_id=consumer_group_name,
        bootstrap_servers=bootstrap_servers,
        enable_auto_commit=False)

#Versatzinformationen schreiben (erstellt, wenn kein Verbraucher vorhanden ist)
consumer.commit(offsets)

Die Offset-Informationen desselben Themas wie consumer01 werden jetzt in consumer02 geschrieben. Wenn Sie Ihr Abonnement mit consumer02 starten, sollten Sie Ihr Abonnement mit demselben Offset wie consumer01 starten, wenn Sie den Offset erhalten haben.

Abonnementbeispiel

consumer_group_name='consumer02'

#Holen Sie sich Verbraucher
consumer = KafkaConsumer(
        group_id=consumer_group_name,
        bootstrap_servers=bootstrap_servers)

#Wechseln Sie zum Abonnieren von topic01
consumer.subscribe(topics=['topic01'])

#Hat tatsächlich an topic01 consumer02 teilgenommen
consumer.poll()

#Empfangene Nachrichten weiter drucken
for msg in consumer:
    print(msg)

Referenz

https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html https://kafka-python.readthedocs.io/en/master/apidoc/KafkaAdminClient.html

Recommended Posts

Kopieren Sie den Consumer-Offset mit kafka-python auf einen anderen
Ich wollte unbedingt mit Selen kopieren
Übertragen Sie Daten mit Streamlit auf eine andere Seite (Registerkarte)
Konvertieren Sie 202003 bis 2020-03 mit Pandas
Ich möchte eine andere Version von Python mit pyvenv angeben
Herausforderung Problem 5 mit Python: Lambda ... Ich habe mich entschieden, ohne zu kopieren
"Tiefe Kopie" und "flache Kopie", um mit dem kleinsten Beispiel zu verstehen
Nachricht vom ersten Offset mit Kafka Consumer in Python abrufen