[PYTHON] Tutoriel RabbitMQ 6 (RPC)

Tutoriel Rabbit MQ 6 https://www.rabbitmq.com/tutorials/tutorial-six-python.html C'est une traduction de. Nous sommes impatients de signaler toute erreur de traduction.

Conditions préalables

Ce didacticiel suppose que RabbitMQ est installé et en cours d'exécution sur le port standard (5672) sur l'hôte local. Si vous souhaitez utiliser un hôte, un port ou des informations d'identification différents, vous devez ajuster vos paramètres de connexion.

Si un problème survient

Si vous rencontrez des problèmes grâce à ce tutoriel, vous pouvez nous contacter via la liste de diffusion.

Appel de procédure à distance (RPC)

(Utilisation du client Python Pika 0.9.8)

Dans le deuxième didacticiel, vous avez appris à utiliser les files d'attente de travail pour répartir les tâches chronophages entre plusieurs travailleurs.

Mais que faire si vous devez exécuter une fonction sur un ordinateur distant et attendre le résultat? C'est une autre histoire. Ce modèle est communément appelé appel de procédure distante ou RPC.

Ce didacticiel utilise RabbitMQ pour créer un système RPC (un client et un serveur RPC évolutif). Nous n'avons pas de tâche fastidieuse pour fournir des valeurs, nous allons donc créer un service RPC factice qui renvoie le nombre de Fibonacci.

Interface client

Créez une classe client simple pour illustrer l'utilisation du service RPC. Cette classe expose une méthode nommée "call" qui envoie une requête RPC et la bloque jusqu'à ce qu'elle reçoive une réponse:

fibonacci_rpc = FibonacciRpcClient()
result = fibonacci_rpc.call(4)
print "fib(4) is %r" % (result,)
Remarques sur RPC

RPC est un modèle assez courant en informatique, mais il est souvent critiqué. Le problème se produit lorsque le programmeur ne sait pas si l'appel de fonction est local ou s'il s'agit d'un RPC lent. Une telle confusion a pour conséquence d'ajouter une complexité inutile aux systèmes et au débogage imprévisibles. Au lieu de simplifier le logiciel, une mauvaise utilisation de RPC entraîne un code spaghetti non maintenable.

Considérez les conseils suivants dans votre esprit:

Assurez-vous qu'il est clair quels appels de fonction sont locaux et lesquels sont distants.
Documentez votre système. Clarifiez les dépendances entre les composants.
Gérez les cas d'erreur. Comment le client doit-il réagir lorsque le serveur RPC est arrêté pendant une période prolongée?

Évitez RPC si vous avez des doutes. Si possible, vous devez utiliser un pipeline asynchrone au lieu d'une méthode de blocage telle que RPC, et les résultats seront transférés de manière asynchrone à l'étape de calcul suivante.

File d'attente de rappel

En général, RPC avec Rabbit MQ est facile. Lorsque le client envoie un message de demande, le serveur répond par un message de réponse. Pour recevoir la réponse, le client doit envoyer la requête à l'adresse de la file d'attente "callback". Essayons:

result = channel.queue_declare(exclusive=True)
callback_queue = result.method.queue

channel.basic_publish(exchange='',
                      routing_key='rpc_queue',
                      properties=pika.BasicProperties(
                            reply_to = callback_queue,
                            ),
                      body=request)

# ... and some code to read a response message from the callback_queue ...
Propriétés du message

Le protocole AMQP définit 14 propriétés qui sont portées avec le message. La plupart des propriétés sont rarement utilisées, avec les exceptions suivantes:

delivery_mode:Marquez le message comme permanent (valeur 2) ou temporaire (autres valeurs). Nous avons utilisé cette propriété dans le didacticiel 2.
content_type:Utilisé pour décrire le type MIME du codage. Par exemple, dans le codage JSON fréquemment utilisé, cette propriété est définie sur "application"./Il est recommandé de le définir sur "json".
reply_to:Habituellement utilisé pour nommer une file d'attente de rappel.
correlation_id:Utile pour mapper les réponses RPC aux demandes.

ID de corrélation

Dans la méthode ci-dessus, nous avons suggéré de générer une file d'attente de rappel pour toutes les demandes RPC. C'est assez inefficace, mais heureusement, il existe un meilleur moyen, générons une file d'attente de rappel pour chaque client.

Cela crée un nouveau problème lorsqu'une réponse est reçue dans une file d'attente et que la demande à laquelle appartient la réponse n'est pas claire. Par conséquent, utilisez la propriété correlation_id. Définissez une valeur unique pour chaque demande. Lorsqu'un message est reçu ultérieurement dans la file d'attente de rappel, la réponse peut être associée à la demande en fonction de cette propriété. Si vous avez une valeur de correlation_id que vous ne connaissez pas, vous pouvez supprimer le message en toute sécurité, il n'appartient pas à notre demande.

Je dois ignorer les messages inconnus dans la file d'attente de rappel au lieu d'échouer avec une erreur, pourquoi? Cela est dû à une condition de conflit possible côté serveur. Il est peu probable que le serveur RPC meure juste après l'envoi de la réponse et avant l'envoi du message d'accusé de réception de la demande. Dans ce cas, le serveur RPC redémarré traitera à nouveau la demande. C'est pourquoi les réponses en double sur le client doivent être gérées avec élégance, et le RPC doit idéalement être brutal.

emballer

Notre RPC fonctionne comme suit:

Lorsque le client démarre, il crée une file d'attente de rappel anonyme et exclusive. Pour les demandes RPC, le client envoie un message avec deux propriétés: reply_to avec une file d'attente de rappel et correlation_id avec une valeur unique pour chaque demande. La requête est envoyée à la file d'attente "rpc_queue". Un travailleur RPC (communément appelé serveur) attend une demande pour cette file d'attente. Lorsque la demande apparaît, il exécute le travail et utilise la file d'attente spécifiée dans le champ reply_to pour envoyer les résultats et le message au client. Le client attend des données dans la file d'attente de rappel. Lorsque le message apparaît, vérifiez la propriété correlation_id. S'il correspond à la valeur de la demande, il renvoie une réponse à l'application.

Tout résumé

Code pour rpc_server.py:

 1    #!/usr/bin/env python
 2    import pika
 3
 4    connection = pika.BlockingConnection(pika.ConnectionParameters(
 5            host='localhost'))
 6
 7    channel = connection.channel()
 8
 9    channel.queue_declare(queue='rpc_queue')
10
11    def fib(n):
12        if n == 0:
13            return 0
14        elif n == 1:
15            return 1
16        else:
17            return fib(n-1) + fib(n-2)
18
19    def on_request(ch, method, props, body):
20        n = int(body)
21
22        print " [.] fib(%s)"  % (n,)
23        response = fib(n)
24
25        ch.basic_publish(exchange='',
26                         routing_key=props.reply_to,
27                         properties=pika.BasicProperties(correlation_id = \
28                                                         props.correlation_id),
29                         body=str(response))
30        ch.basic_ack(delivery_tag = method.delivery_tag)
31
32    channel.basic_qos(prefetch_count=1)
33    channel.basic_consume(on_request, queue='rpc_queue')
34
35    print " [x] Awaiting RPC requests"
36    channel.start_consuming()

Le code serveur est assez simple:

(4) Comme toujours, commencez par établir une connexion et déclarer une file d'attente. (11) Déclarez la fonction Fibonacci. Cela suppose uniquement une entrée entière positive valide. (Ne vous attendez pas à travailler pour de grands nombres, c'est probablement l'implémentation récursive la plus lente des implémentations possibles). (19) Déclarez le cœur du serveur RPC, le rappel de basic_consume. Exécuté lorsqu'une demande est reçue. Travaillez et répondez avec une réponse. (32) Vous souhaiterez peut-être exécuter plusieurs processus serveur. Vous devez définir prefetch_count pour répartir uniformément la charge sur plusieurs serveurs.

Code pour rpc_client.py:

 1    #!/usr/bin/env python
 2    import pika
 3    import uuid
 4
 5    class FibonacciRpcClient(object):
 6        def __init__(self):
 7            self.connection = pika.BlockingConnection(pika.ConnectionParameters(
 8                    host='localhost'))
 9
10            self.channel = self.connection.channel()
11
12            result = self.channel.queue_declare(exclusive=True)
13            self.callback_queue = result.method.queue
14
15            self.channel.basic_consume(self.on_response, no_ack=True,
16                                       queue=self.callback_queue)
17
18        def on_response(self, ch, method, props, body):
19            if self.corr_id == props.correlation_id:
20                self.response = body
21
22        def call(self, n):
23            self.response = None
24            self.corr_id = str(uuid.uuid4())
25            self.channel.basic_publish(exchange='',
26                                       routing_key='rpc_queue',
27                                       properties=pika.BasicProperties(
28                                             reply_to = self.callback_queue,
29                                             correlation_id = self.corr_id,
30                                             ),
31                                       body=str(n))
32            while self.response is None:
33                self.connection.process_data_events()
34            return int(self.response)
35
36    fibonacci_rpc = FibonacciRpcClient()
37
38    print " [x] Requesting fib(30)"
39    response = fibonacci_rpc.call(30)
40    print " [.] Got %r" % (response,)

Le code client est un peu compliqué:

(7) Établissez une connexion, canalisez et déclarez une file d'attente de "rappel" exclusive pour les réponses. (16) Abonnez-vous à la file d'attente "Callback" afin de pouvoir recevoir des réponses RPC. (18) Le rappel "on_response" effectué sur chaque réponse fait une tâche très simple, pour chaque message de réponse, il vérifie si le correlation_id est ce que nous recherchons. Si tel est le cas, enregistrez la réponse dans self.response et interrompez la boucle de consommation. (23) Ensuite, définissez la méthode d'appel principale, où vous effectuez la requête RPC réelle. (24) Dans cette méthode, commencez par générer un numéro de corrélation_id unique et enregistrez-le, la fonction de rappel "on_response" utilise cette valeur pour capturer la réponse appropriée. (25) Ensuite, émettez un message de requête avec deux propriétés ("reply_to" et "correlation_id"). (32) Maintenant, calmez-vous et attendez que la réponse appropriée arrive. (33) Et enfin, il revient en arrière et renvoie une réponse à l'utilisateur.

Le service RPC est prêt. Vous pouvez démarrer le serveur:

$ python rpc_server.py
 [x] Awaiting RPC requests

Exécutez le client pour demander le numéro de Fibonacci:

$ python rpc_client.py
 [x] Requesting fib(30)

Cette conception n'est pas la seule implémentation possible du service RPC, mais elle présente des avantages importants.

Si votre serveur RPC est très lent, vous pouvez le mettre à l'échelle avec une seule exécution supplémentaire. Essayez d'exécuter un deuxième rpc_server.py dans la nouvelle console. Du côté client, le RPC n'a besoin d'envoyer et de recevoir qu'un seul message. Aucun appel synchrone tel que queue_declare n'est nécessaire. Par conséquent, le client RPC n'a besoin que d'une seule orbite réseau pour chaque requête RPC.

Ce code est encore assez simplifié et ne résout pas les problèmes plus complexes (mais importants) tels que:

Comment le client doit-il réagir si aucun serveur n'est en cours d'exécution? Le client doit-il avoir un délai d'attente pour le RPC? Si un serveur tombe en panne et qu'une exception se produit, dois-je le transmettre au client? Protection contre les messages entrants indésirables avant le traitement (par exemple, vérification des limites)

Si vous voulez essayer, rabbitmq pour voir la file d'attente-Le plug-in de gestion est activé.

Recommended Posts

Tutoriel RabbitMQ 6 (RPC)
Tutoriel RabbitMQ 5 (sujet)
Tutoriel RabbitMQ 4 (Routage)
Tutoriel RabbitMQ 3 (Publier / S'abonner)
Tutoriel RabbitMQ 1 ("Hello World!")
didacticiel sqlalchemy
Tutoriel PyODE 2
Tutoriel Python
Tutoriel PyODE 1
Tutoriel PyODE 3
Tutoriel du didacticiel TensorFlow