[PYTHON] Didacticiel RabbitMQ 2 (file d'attente de travail)

Tutoriel Rabbit MQ 2 https://www.rabbitmq.com/tutorials/tutorial-two-python.html C'est une traduction de. Nous sommes impatients de signaler toute erreur de traduction.

Conditions préalables

Ce didacticiel suppose que RabbitMQ est installé et en cours d'exécution sur le port standard (5672) sur l'hôte local. Si vous souhaitez utiliser un hôte, un port ou des informations d'identification différents, vous devez ajuster vos paramètres de connexion.

Si un problème survient

Si vous rencontrez des problèmes grâce à ce tutoriel, vous pouvez nous contacter via la liste de diffusion.

File d'attente de travail

(Utilisation du client Python Pika 0.9.8)

Dans le premier tutoriel, j'ai écrit un programme pour envoyer et recevoir des messages à partir d'une file d'attente nommée. Cela crée une file d'attente de travail qui est utilisée pour répartir les tâches chronophages entre plusieurs travailleurs.

L'idée principale derrière les files d'attente de travail (communément appelées files d'attente de tâches) est d'exécuter immédiatement les tâches gourmandes en ressources et d'éviter d'attendre qu'elles se terminent. Au lieu de cela, planifiez la tâche pour qu'elle s'exécute plus tard. Encapsulez la tâche sous forme de message et envoyez-la à la file d'attente. Un processus de travail s'exécutant en arrière-plan récupère la tâche et exécute finalement le travail. Lorsque vous exécutez de nombreux nœuds de calcul, les tâches sont partagées entre eux.

Ce concept est particulièrement utile dans les applications Web, où il est impossible de gérer des tâches complexes lors d'une courte requête HTTP.

Préparation

Dans la partie précédente de ce tutoriel, nous avons envoyé un message contenant "Hello World!". Nous sommes sur le point d'envoyer une chaîne qui représente une tâche complexe. Il n'a pas de tâches du monde réel comme le redimensionnement d'images ou les fichiers PDF rendus, alors faisons semblant d'être occupé, en utilisant la fonction time.sleep () fais le. Montrons la complexité par le nombre de points dans la chaîne, disons que chaque point occupe une seconde de "travail". Par exemple, une fausse tâche marquée "Bonjour ..." prend 3 secondes.

Modifiez légèrement le code send.py de l'exemple précédent afin que tout message soit envoyé à partir de la ligne de commande. Ce programme planifie les tâches dans la file d'attente de travail, appelons-le new_task.py:

import sys

message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange='',
                      routing_key='hello',
                      body=message)
print " [x] Sent %r" % (message,)

Le script receive.py précédent nécessite également quelques modifications. Cela nécessite un faux travail d'une seconde pour chaque point contenu dans le corps du message. Appelons ce worker.py car il prendra le message de la file d'attente et effectuera la tâche:

import time

def callback(ch, method, properties, body):
    print " [x] Received %r" % (body,)
    time.sleep( body.count('.') )
    print " [x] Done"

Distribution à la ronde

L'un des avantages de l'utilisation des files d'attente de tâches est que vous pouvez facilement paralléliser votre travail. La création d'un arriéré de travail vous permet d'ajouter plus de travailleurs et donc d'évoluer facilement.

Tout d'abord, exécutons deux scripts worker.py en même temps. Ils reçoivent des messages des deux files d'attente, mais comment exactement? Nous allons jeter un coup d'oeil.

Vous devez ouvrir trois consoles. Les deux exécutent le script worker.py. Ces consoles seront deux consommateurs, "C1" et "C2".

shell1$ python worker.py
 [*] Waiting for messages. To exit press CTRL+C
shell2$ python worker.py
 [*] Waiting for messages. To exit press CTRL+C

Publiez une nouvelle tâche dans la troisième console. Après avoir démarré le consommateur, vous pouvez publier certains messages.

shell3$ python new_task.py First message.
shell3$ python new_task.py Second message..
shell3$ python new_task.py Third message...
shell3$ python new_task.py Fourth message....
shell3$ python new_task.py Fifth message.....

Voyons s'il est livré aux travailleurs:

shell1$ python worker.py
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'First message.'
 [x] Received 'Third message...'
 [x] Received 'Fifth message.....'
shell2$ python worker.py
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'Second message..'
 [x] Received 'Fourth message....'

Par défaut, RabbitMQ envoie chaque message au consommateur suivant dans la colonne. En moyenne, tous les consommateurs reçoivent le même nombre de messages. Cette méthode de remise des messages s'appelle Round Robin. Essayez ceci avec 3 ouvriers ou plus.

Accusé de réception du message

La tâche peut prendre quelques secondes. Vous vous demandez peut-être ce qui se passe si l'un des consommateurs commence une longue tâche, la termine partiellement et meurt. Dans le code actuel, RabbitMQ délivre le message au client une fois et le supprime immédiatement de la mémoire. Dans ce cas, si vous tuez le worker, vous perdrez le message en cours de traitement. Vous perdrez également tous les messages qui ont été envoyés à un travailleur particulier mais qui n'ont pas encore été traités.

Mais je ne veux pas perdre toutes les tâches. Si un travailleur meurt, je souhaite que la tâche soit confiée à d'autres travailleurs.

Pour garantir que vos messages ne sont jamais perdus, RabbitMQ prend en charge les accusés de réception de message. L'accusé de réception (nowledgement) est retourné par le consommateur pour informer RabbitMQ qu'un message particulier a été reçu et traité, et RabbitMQ est libre de le supprimer.

Si un consommateur décède sans envoyer d'accusé de réception, RabbitMQ se rend compte que le message n'a pas été complètement traité et le redistribue à un autre consommateur. De cette façon, vous pouvez être sûr que vos messages ne seront pas perdus si le travailleur meurt occasionnellement.

Il n'y a pas de délai d'expiration du message. RabbitMQ redistribue le message uniquement si la connexion de travail s'éteint. Le traitement du message peut prendre très, très longtemps.

L'accusé de réception de message est activé par défaut. Dans l'exemple précédent, il a été explicitement désactivé via l'indicateur no_ack = True. Supprimez cet indicateur et laissez le travailleur envoyer l'accusé de réception approprié lorsque la tâche est terminée.

def callback(ch, method, properties, body):
    print " [x] Received %r" % (body,)
    time.sleep( body.count('.') )
    print " [x] Done"
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_consume(callback,
                      queue='hello')

Avec ce code, vous pouvez utiliser CTRL + C lors du traitement d'un message pour vous assurer que tuer un worker ne perd rien. Immédiatement après la mort du travailleur, tous les messages sans réponse sont redistribués.

Reconnaissance oubliée

    basic_Oublier d'acquitter est une erreur courante. C'est une simple erreur, mais le résultat est grave. Le message est renvoyé lorsque le client quitte (bien que cela puisse ressembler à une nouvelle livraison aléatoire), mais il n'est plus possible de libérer un message non empaqueté et RabbitMQ utilise plus de mémoire Viendra faire.

Pour déboguer ce type d'erreur, les messages_Vous pouvez utiliser rabbitmqctl pour afficher le champ non acquitté:

    $ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
    Listing queues ...
    hello    0       0
    ...done.

Durabilité des messages

Vous avez appris à vous assurer que vos tâches ne sont pas perdues si le consommateur décède. Cependant, même si le serveur RabbitMQ tombe en panne, le travail est toujours perdu.

Si RabbitMQ se ferme ou plante, vous oubliez vos files d'attente et vos messages à moins que vous ne leur disiez de ne pas le faire. Pour garantir que les messages ne sont pas perdus, deux choses sont nécessaires: la file d'attente et le message doivent être marqués comme durables.

Tout d'abord, vous devez empêcher RabbitMQ de perdre la file d'attente. Par conséquent, il doit être déclaré comme * durable *:

channel.queue_declare(queue='hello', durable=True)

Cette commande est correcte en soi, mais elle ne fonctionne pas dans notre configuration. Cela est dû au fait que la file d'attente appelée hello est déjà définie comme non durable. RabbitMQ ne peut pas redéfinir une file d'attente existante avec des paramètres différents et renverra une erreur à tout programme qui tente de le faire. Mais il existe une solution de contournement rapide, définissons la file d'attente avec un nom différent, par exemple task_queue:

channel.queue_declare(queue='task_queue', durable=True)

Cette modification de queue_declare doit être appliquée à la fois au code producteur et consommateur.

Vous pouvez maintenant voir que la file d'attente task_queue n'est pas perdue lorsque RabbitMQ redémarre. Ensuite, marquons le message comme persistant, en donnant à delivery_mode une valeur de 2.

channel.basic_publish(exchange='',
                      routing_key="task_queue",
                      body=message,
                      properties=pika.BasicProperties(
                         delivery_mode = 2, # make message persistent
                      ))
Remarques sur la persistance des messages

Marquer un message comme persistant ne garantit pas que le message ne sera jamais perdu. Il dit à RabbitMQ d'enregistrer le message sur le disque, mais il y a encore un court laps de temps entre RabbitMQ acceptant le message et ne pas encore l'enregistrer. RabbitMQ effectue également une synchronisation pour chaque message(2)Ne fonctionne pas, il peut simplement être mis en cache et ne pas être réellement écrit sur le disque. La garantie de persistance n'est pas forte, mais elle est plus que suffisante pour une simple file d'attente de tâches. Si vous avez besoin d'une garantie solide, vous pouvez utiliser la confirmation de l'éditeur.

Expédition équitable

Vous avez peut-être remarqué que l'envoi ne fonctionne toujours pas comme nous le souhaitons. Par exemple, si vous avez deux travailleurs et que tous les messages impairs sont lourds et les messages pairs sont légers, un travailleur sera toujours occupé et l'autre fera peu de travail. RabbitMQ n'en sait rien et continue d'envoyer les messages de manière uniforme.

Cela se produit parce que RabbitMQ distribue simplement le message lorsqu'il est mis en file d'attente. Je n'ai pas vu le nombre de messages non confirmés pour les consommateurs. Envoyez simplement aveuglément tous les nièmes messages au nième consommateur.

Pour éviter cela, vous pouvez utiliser la méthode basic.qos avec le paramètre prefetch_count = 1. Cela indique à RabbitMQ de ne pas donner plusieurs messages à un travailleur à la fois. En d'autres termes, il n'envoie pas de nouveau message au travailleur tant que celui-ci n'a pas traité le message précédent et l'a reconnu. À la place, envoyez-le au prochain travailleur qui n'est pas encore occupé.

channel.basic_qos(prefetch_count=1)
Remarque sur la taille de la file d'attente

La file d'attente peut se remplir si tous les travailleurs sont en cours d'utilisation. Dans de tels cas, vous devez ajouter plus de travailleurs ou adopter d'autres stratégies.

Tout résumé

Code final pour le script new_task.py:

 1    #!/usr/bin/env python
 2    import pika
 3    import sys
 4
 5    connection = pika.BlockingConnection(pika.ConnectionParameters(
 6            host='localhost'))
 7    channel = connection.channel()
 8
 9    channel.queue_declare(queue='task_queue', durable=True)
10
11    message = ' '.join(sys.argv[1:]) or "Hello World!"
12    channel.basic_publish(exchange='',
13                          routing_key='task_queue',
14                          body=message,
15                          properties=pika.BasicProperties(
16                             delivery_mode = 2, # make message persistent
17                          ))
18    print " [x] Sent %r" % (message,)
19    connection.close()

Et travailleur:

 1    #!/usr/bin/env python
 2    import pika
 3    import time
 4
 5    connection = pika.BlockingConnection(pika.ConnectionParameters(
 6            host='localhost'))
 7    channel = connection.channel()
 8
 9    channel.queue_declare(queue='task_queue', durable=True)
10    print ' [*] Waiting for messages. To exit press CTRL+C'
11
12    def callback(ch, method, properties, body):
13        print " [x] Received %r" % (body,)
14        time.sleep( body.count('.') )
15        print " [x] Done"
16        ch.basic_ack(delivery_tag = method.delivery_tag)
17
18    channel.basic_qos(prefetch_count=1)
19    channel.basic_consume(callback,
20                          queue='task_queue')
21
22    channel.start_consuming()

Vous pouvez utiliser l'accusé de réception de message et prefetch_count pour configurer une file d'attente de travail. L'option de durabilité garantit que la tâche survit même si le Rabbit MQ est redémarré.

Vous pouvez maintenant passer au didacticiel 3 et apprendre à transmettre le même message à de nombreux consommateurs.

Recommended Posts

Didacticiel RabbitMQ 2 (file d'attente de travail)
Tutoriel RabbitMQ 5 (sujet)
Tutoriel RabbitMQ 6 (RPC)
Tutoriel RabbitMQ 4 (Routage)
Tutoriel RabbitMQ 3 (Publier / S'abonner)
Tutoriel RabbitMQ 1 ("Hello World!")
queue