Rabbit MQ Tutorial 6 https://www.rabbitmq.com/tutorials/tutorial-six-python.html Es ist eine Übersetzung von. Wir freuen uns darauf, auf Übersetzungsfehler hinzuweisen.
In diesem Lernprogramm wird davon ausgegangen, dass RabbitMQ auf dem Standardport (5672) auf dem lokalen Host installiert ist und ausgeführt wird. Wenn Sie einen anderen Host, Port oder Anmeldeinformationen verwenden möchten, müssen Sie Ihre Verbindungseinstellungen anpassen.
Wenn Sie in diesem Tutorial auf Probleme stoßen, können Sie uns über die Mailingliste kontaktieren.
Im zweiten Lernprogramm haben Sie gelernt, wie Sie mithilfe von Arbeitswarteschlangen zeitaufwändige Aufgaben auf mehrere Mitarbeiter verteilen.
Aber was ist, wenn Sie eine Funktion auf einem Remotecomputer ausführen und auf das Ergebnis warten müssen? Das ist eine andere Geschichte. Dieses Muster wird allgemein als Remote Procedure Call oder RPC bezeichnet.
In diesem Tutorial wird RabbitMQ zum Erstellen eines RPC-Systems (eines Clients und eines skalierbaren RPC-Servers) verwendet. Wir haben keine zeitaufwändige Aufgabe, Werte bereitzustellen. Erstellen wir also einen Dummy-RPC-Service, der die Fibonacci-Anzahl zurückgibt.
Erstellen Sie eine einfache Clientklasse, um zu veranschaulichen, wie der RPC-Dienst verwendet wird. Diese Klasse macht eine Methode namens "call" verfügbar, die eine RPC-Anfrage sendet und blockiert, bis sie eine Antwort erhält:
fibonacci_rpc = FibonacciRpcClient()
result = fibonacci_rpc.call(4)
print "fib(4) is %r" % (result,)
Hinweise zu RPC
RPC ist ein ziemlich verbreitetes Muster in der Datenverarbeitung, wird jedoch häufig kritisiert. Das Problem tritt auf, wenn der Programmierer nicht weiß, ob der Funktionsaufruf lokal ist oder ob es sich um einen langsamen RPC handelt. Eine solche Verwirrung hat zur Folge, dass unvorhersehbare Systeme und das Debuggen unnötig komplex werden. Anstatt die Software zu vereinfachen, führt der Missbrauch von RPC zu nicht wartbarem Spaghetti-Code.
Beachten Sie die folgenden Ratschläge:
Stellen Sie sicher, dass klar ist, welche Funktionsaufrufe lokal und welche remote sind.
Dokumentieren Sie Ihr System. Klären Sie die Abhängigkeiten zwischen Komponenten.
Behandeln Sie Fehlerfälle. Wie soll der Client reagieren, wenn der RPC-Server längere Zeit nicht verfügbar ist?
Vermeiden Sie RPC, wenn Sie irgendwelche Zweifel haben. Wenn möglich, sollten Sie eine asynchrone Pipeline anstelle einer Blockierungsmethode wie RPC verwenden. Die Ergebnisse werden asynchron in die nächste Berechnungsstufe verschoben.
Im Allgemeinen ist RPC mit Rabbit MQ einfach. Wenn der Client eine Anforderungsnachricht sendet, antwortet der Server mit einer Antwortnachricht. Um die Antwort zu erhalten, muss der Client der Anfrage die Adresse der "Rückruf" -Warteschlange senden. Lass es uns versuchen:
result = channel.queue_declare(exclusive=True)
callback_queue = result.method.queue
channel.basic_publish(exchange='',
routing_key='rpc_queue',
properties=pika.BasicProperties(
reply_to = callback_queue,
),
body=request)
# ... and some code to read a response message from the callback_queue ...
Nachrichteneigenschaften
Das AMQP-Protokoll definiert 14 Eigenschaften, die mit der Nachricht übertragen werden. Die meisten Eigenschaften werden mit den folgenden Ausnahmen selten verwendet:
delivery_mode:Markieren Sie die Nachricht als permanent (Wert 2) oder temporär (andere Werte). Wir haben diese Eigenschaft in Tutorial 2 verwendet.
content_type:Wird verwendet, um den MIME-Typ der Codierung zu beschreiben. In der häufig verwendeten JSON-Codierung wird diese Eigenschaft beispielsweise auf "Anwendung" festgelegt./Es wird empfohlen, "json" einzustellen.
reply_to:Wird normalerweise verwendet, um eine Rückrufwarteschlange zu benennen.
correlation_id:Nützlich für die Zuordnung von RPC-Antworten zu Anforderungen.
Bei der obigen Methode habe ich vorgeschlagen, eine Rückrufwarteschlange für alle RPC-Anforderungen zu generieren. Dies ist ziemlich ineffizient, aber zum Glück gibt es einen besseren Weg, eine Callback-Warteschlange für jeden Client zu generieren.
Das neue Problem tritt jetzt auf, wenn eine Antwort in einer Warteschlange empfangen wird und die Anforderung, zu der die Antwort gehört, nicht klar ist. Verwenden Sie daher die Eigenschaft configuration_id. Legen Sie für jede Anforderung einen eindeutigen Wert fest. Wenn eine Nachricht später in der Rückrufwarteschlange empfangen wird, kann die Antwort der Anforderung basierend auf dieser Eigenschaft zugeordnet werden. Wenn Sie einen Wert von configuration_id haben, den Sie nicht kennen, können Sie die Nachricht sicher verwerfen, sie gehört nicht zu unserer Anfrage.
Ich muss unbekannte Nachrichten in der Rückrufwarteschlange ignorieren, anstatt mit einem Fehler zu scheitern. Warum? Dies ist auf eine mögliche Konfliktbedingung auf der Serverseite zurückzuführen. Es ist unwahrscheinlich, dass der RPC-Server unmittelbar nach dem Senden der Antwort und vor dem Senden der Anforderungsbestätigungsnachricht stirbt. In diesem Fall verarbeitet der neu gestartete RPC-Server die Anforderung erneut. Aus diesem Grund sollten doppelte Antworten auf dem Client ordnungsgemäß behandelt werden, und der RPC sollte im Idealfall stumpf sein.
Unser RPC funktioniert wie folgt:
Wenn der Client gestartet wird, erstellt er eine anonyme, exklusive Rückrufwarteschlange. Bei RPC-Anforderungen sendet der Client eine Nachricht mit zwei Eigenschaften: reply_to mit einer Rückrufwarteschlange und korrelation_id mit einem eindeutigen Wert für jede Anforderung. Die Anfrage wird an die Warteschlange "rpc_queue" gesendet. Ein RPC-Worker (allgemein als Server bezeichnet) wartet auf eine Anforderung für diese Warteschlange. Wenn die Anforderung angezeigt wird, erledigt sie den Job und verwendet die im Feld reply_to angegebene Warteschlange, um die Ergebnisse und die Nachricht an den Client zu senden. Der Client wartet auf Daten in der Rückrufwarteschlange. Wenn die Meldung angezeigt wird, überprüfen Sie die Eigenschaft configuration_id. Wenn es mit dem Wert aus der Anforderung übereinstimmt, gibt es eine Antwort an die Anwendung zurück.
Code für rpc_server.py:
1 #!/usr/bin/env python
2 import pika
3
4 connection = pika.BlockingConnection(pika.ConnectionParameters(
5 host='localhost'))
6
7 channel = connection.channel()
8
9 channel.queue_declare(queue='rpc_queue')
10
11 def fib(n):
12 if n == 0:
13 return 0
14 elif n == 1:
15 return 1
16 else:
17 return fib(n-1) + fib(n-2)
18
19 def on_request(ch, method, props, body):
20 n = int(body)
21
22 print " [.] fib(%s)" % (n,)
23 response = fib(n)
24
25 ch.basic_publish(exchange='',
26 routing_key=props.reply_to,
27 properties=pika.BasicProperties(correlation_id = \
28 props.correlation_id),
29 body=str(response))
30 ch.basic_ack(delivery_tag = method.delivery_tag)
31
32 channel.basic_qos(prefetch_count=1)
33 channel.basic_consume(on_request, queue='rpc_queue')
34
35 print " [x] Awaiting RPC requests"
36 channel.start_consuming()
Der Servercode ist ziemlich einfach:
(4) Beginnen Sie wie immer damit, eine Verbindung herzustellen und eine Warteschlange zu deklarieren. (11) Deklarieren Sie die Fibonacci-Funktion. Dies setzt nur die Eingabe gültiger positiver Ganzzahlen voraus. (Erwarten Sie nicht, für große Zahlen zu arbeiten, dies ist wahrscheinlich die langsamste rekursive Implementierung der möglichen Implementierungen). (19) Deklarieren Sie den Kern des RPC-Servers, den Rückruf für basic_consume. Wird ausgeführt, wenn eine Anforderung empfangen wird. Arbeite und antworte mit einer Antwort. (32) Möglicherweise möchten Sie mehrere Serverprozesse ausführen. Sie sollten prefetch_count festlegen, um die Last gleichmäßig auf mehrere Server zu verteilen.
Code für rpc_client.py:
1 #!/usr/bin/env python
2 import pika
3 import uuid
4
5 class FibonacciRpcClient(object):
6 def __init__(self):
7 self.connection = pika.BlockingConnection(pika.ConnectionParameters(
8 host='localhost'))
9
10 self.channel = self.connection.channel()
11
12 result = self.channel.queue_declare(exclusive=True)
13 self.callback_queue = result.method.queue
14
15 self.channel.basic_consume(self.on_response, no_ack=True,
16 queue=self.callback_queue)
17
18 def on_response(self, ch, method, props, body):
19 if self.corr_id == props.correlation_id:
20 self.response = body
21
22 def call(self, n):
23 self.response = None
24 self.corr_id = str(uuid.uuid4())
25 self.channel.basic_publish(exchange='',
26 routing_key='rpc_queue',
27 properties=pika.BasicProperties(
28 reply_to = self.callback_queue,
29 correlation_id = self.corr_id,
30 ),
31 body=str(n))
32 while self.response is None:
33 self.connection.process_data_events()
34 return int(self.response)
35
36 fibonacci_rpc = FibonacciRpcClient()
37
38 print " [x] Requesting fib(30)"
39 response = fibonacci_rpc.call(30)
40 print " [.] Got %r" % (response,)
Der Client-Code ist etwas kompliziert:
(7) Stellen Sie eine Verbindung her, kanalisieren Sie und deklarieren Sie eine exklusive "Rückruf" -Warteschlange für Antworten. (16) Abonnieren Sie die "Rückruf" -Warteschlange, damit Sie RPC-Antworten erhalten können. (18) Der für jede Antwort ausgeführte Rückruf "on_response" führt eine sehr einfache Aufgabe aus. Für jede Antwortnachricht wird geprüft, ob die Korrelations-ID das ist, wonach wir suchen. Wenn ja, speichern Sie die Antwort in self.response und unterbrechen Sie die Verbrauchsschleife. (23) Definieren Sie als Nächstes die Hauptaufrufmethode, bei der Sie die eigentliche RPC-Anforderung stellen. (24) Generieren Sie bei dieser Methode zunächst eine eindeutige Korrelations-ID-Nummer und speichern Sie diese. Die Rückruffunktion "on_response" verwendet diesen Wert, um die entsprechende Antwort zu erfassen. (25) Geben Sie als Nächstes eine Anforderungsnachricht mit zwei Eigenschaften aus ("reply_to" und "configuration_id"). (32) Beruhige dich jetzt und warte auf die entsprechende Antwort. (33) Und schließlich geht es zurück und gibt eine Antwort an den Benutzer zurück.
Der RPC-Dienst ist bereit. Sie können den Server starten:
$ python rpc_server.py
[x] Awaiting RPC requests
Führen Sie den Client aus, um die Fibonacci-Nummer anzufordern:
$ python rpc_client.py
[x] Requesting fib(30)
Dieses Design ist nicht die einzig mögliche Implementierung des RPC-Dienstes, bietet jedoch einige wichtige Vorteile.
Wenn Ihr RPC-Server sehr langsam ist, können Sie ihn mit nur einem weiteren Lauf skalieren. Versuchen Sie, eine zweite rpc_server.py in der neuen Konsole auszuführen. Auf der Clientseite muss der RPC nur eine Nachricht senden und empfangen. Es ist kein synchroner Aufruf wie queue_declare erforderlich. Infolgedessen benötigt der RPC-Client nur eine Netzwerkumlaufbahn für jede RPC-Anforderung.
Dieser Code ist noch ziemlich vereinfacht und löst keine komplexeren (aber wichtigen) Probleme wie:
Wie soll der Client reagieren, wenn kein Server läuft? Sollte der Client eine Zeitüberschreitung für den RPC haben? Wenn ein Server ausfällt und eine Ausnahme auftritt, muss ich ihn an den Client weiterleiten? Schutz vor unerwünschten eingehenden Nachrichten vor der Verarbeitung (z. B. Grenzüberprüfung).
Wenn Sie es versuchen möchten, können Sie mit rabbitmq die Warteschlange anzeigen-Das Management-Plug-In ist gültig.
Recommended Posts