décalage du consommateur kafka-copy vers un autre avec python

Lors de l'implémentation du consommateur Kafka, j'utilise souvent des langages tels que Java et Scala, mais j'aimerais utiliser Python, qui peut être implémenté légèrement si ce n'est pas si compliqué. Cette fois, j'aimerais écrire un processus de maintenance qui déplace le décalage d'un consommateur vers un autre consommateur en utilisant kafka-python de la bibliothèque Apache Kafka pour Python.

Obtenez une compensation client

Cette fois, nous utiliserons KafkaAdminClient. Vous pouvez l'obtenir auprès de KafkaConsumer, mais il est compliqué de s'abonner et d'interroger le consommateur, il est donc recommandé d'utiliser ce client.

from kafka import KafkaAdminClient

#En supposant que consumer01 s'abonne à topic01
target_consumer_name = "consumer01"

#Obtenez KafkaAdminClient
cluster_admin = KafkaAdminClient(bootstrap_servers=bootstrap_servers)

#Obtenir un décalage
offsets = cluster_admin.list_consumer_group_offsets(target_consumer_name)

Pour référence, quelque chose comme celui-ci sera retourné. Décalez les informations avec la classe TopicPartition comme clé.

{TopicPartition(topic='topic01', partition=0): OffsetAndMetadata(offset=14475, metadata='aa6c00e6-ffbf-41a3-b011-6997549f6166a'),
 TopicPartition(topic='topic01', partition=1): OffsetAndMetadata(offset=14494, metadata='8fcc736c-1cb0-41b5-b111-6d55d67b3096a')}

Ecrire le décalage sur un autre consommateur

Utilisez Kafka Consumer pour réécrire le consommateur. Écrivez les informations de décalage obtenues précédemment dans consumer02. Si le consommateur n'existe pas, il sera créé automatiquement.

from kafka import KafkaConsumer

consumer_group_name = 'consumer02'

consumer = KafkaConsumer(
        group_id=consumer_group_name,
        bootstrap_servers=bootstrap_servers,
        enable_auto_commit=False)

#Écrire les informations de décalage (créées si le consommateur n'existe pas)
consumer.commit(offsets)

Les informations de décalage de la même rubrique que consumer01 sont désormais écrites dans consumer02. Si vous démarrez votre abonnement à l'aide de consumer02, vous devez démarrer votre abonnement à partir du même décalage que consumer01 lorsque vous avez obtenu le décalage.

Exemple d'abonnement

consumer_group_name='consumer02'

#Obtenez le consommateur
consumer = KafkaConsumer(
        group_id=consumer_group_name,
        bootstrap_servers=bootstrap_servers)

#Changer pour s'abonner topic01
consumer.subscribe(topics=['topic01'])

#Participé au topic01 consumer02
consumer.poll()

#Continuer à imprimer les messages reçus
for msg in consumer:
    print(msg)

référence

https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html https://kafka-python.readthedocs.io/en/master/apidoc/KafkaAdminClient.html

Recommended Posts

décalage du consommateur kafka-copy vers un autre avec python
Je voulais vraiment copier avec du sélénium
Transférer les données vers une autre page (onglet) avec streamlit
Convertir 202003 en 2020-03 avec les pandas
Je souhaite spécifier une autre version de Python avec pyvenv
Défi problème 5 avec Python: lambda ... j'ai décidé de copier sans
"Copie profonde" et "Copie superficielle" à comprendre avec le plus petit exemple
Obtenir le message du premier offset avec le consommateur kafka en python