Rabbit MQ Tutorial 3 https://www.rabbitmq.com/tutorials/tutorial-three-python.html Es ist eine Übersetzung von. Wir freuen uns darauf, auf Übersetzungsfehler hinzuweisen.
In diesem Lernprogramm wird davon ausgegangen, dass RabbitMQ auf dem Standardport (5672) auf dem lokalen Host installiert ist und ausgeführt wird. Wenn Sie einen anderen Host, Port oder Anmeldeinformationen verwenden möchten, müssen Sie Ihre Verbindungseinstellungen anpassen.
Wenn Sie in diesem Tutorial auf Probleme stoßen, können Sie uns über die Mailingliste kontaktieren.
Im vorherigen Lernprogramm haben Sie eine Arbeitswarteschlange erstellt. Bei der Arbeitswarteschlange wird davon ausgegangen, dass jede Aufgabe genau einem Mitarbeiter übergeben wird. In diesem Teil werden wir etwas völlig anderes tun: die Nachricht an mehrere Verbraucher übermitteln. Dieses Muster wird als "Publish / Subscribe" bezeichnet.
Erstellen Sie ein einfaches Protokollierungssystem, um das Muster zu veranschaulichen. Es besteht aus zwei Programmen, das erste dient zum Senden einer Protokollnachricht und das zweite zum Empfangen und Drucken.
Im Protokollierungssystem erhält jede laufende Kopie des Empfängerprogramms eine Nachricht. Daher können Sie einen Empfänger ausführen, um das Protokoll auf die Festplatte zu schreiben, und gleichzeitig den anderen Empfänger ausführen, um das Protokoll auf dem Bildschirm anzuzeigen.
Grundsätzlich werden veröffentlichte Protokollnachrichten an alle Empfänger gesendet.
Im vorherigen Teil des Tutorials haben wir Nachrichten an und von der Warteschlange gesendet und empfangen. Hier stellen wir das vollständige Messaging-Modell von Rabbit vor.
Lassen Sie uns auf das zurückblicken, was wir im vorherigen Tutorial erklärt haben:
Die Kernidee des Messaging-Modells in RabbitMQ besteht darin, dass Produzenten keine Nachrichten direkt an die Warteschlange senden. Tatsächlich wissen Produzenten oft gar nicht, ob eine Nachricht an eine Warteschlange gesendet wird.
Stattdessen können Produzenten nur Nachrichten an eine "Vermittlungsstelle" senden. Der Austausch ist sehr einfach. Empfangen Sie Nachrichten von Produzenten einerseits und schieben Sie sie andererseits in die Warteschlange. Der Austausch muss genau wissen, was mit der empfangenen Nachricht zu tun ist. Soll ich es einer bestimmten Warteschlange hinzufügen? Sollte ich es vielen Warteschlangen hinzufügen? Oder sollte es zerstört werden? Diese Gesetze werden durch die Art des Austauschs definiert.
Es stehen verschiedene Austauschtypen zur Verfügung: direkte Themen, Überschriften, Fan-Outs. Konzentrieren wir uns auf den letzten, das heißt auffächern. Erstellen wir nun einen Austausch dieses Typs und nennen ihn "Protokolle":
channel.exchange_declare(exchange='logs',
type='fanout')
Der Fan-Out-Austausch ist sehr einfach. Wie Sie sich anhand des Namens vorstellen können, sendet es jede empfangene Nachricht an jede ihm bekannte Warteschlange. Und genau das braucht unser Logger.
Liste der Börsen
Verwenden Sie rabbitmqctl, um die Austausche auf dem Server aufzulisten.
$ sudo rabbitmqctl list_exchanges
Listing exchanges ...
logs fanout
amq.direct direct
amq.topic topic
amq.fanout fanout
amq.headers headers
...done.
Es gibt mehrere in dieser Liste"amq.*"Es gibt eine angerufene Vermittlungsstelle und eine standardmäßige (unbenannte) Vermittlungsstelle. Diese werden standardmäßig erstellt, müssen jedoch derzeit nur selten verwendet werden.
Anonymer Austausch
Im vorherigen Teil des Tutorials wusste ich nichts über den Austausch, aber ich konnte eine Nachricht an die Warteschlange senden. Leerer String ("") Wurde verwendet, weil der Standardaustausch verwendet wurde, der durch gekennzeichnet ist.
Denken Sie daran, wie Sie Ihre Nachricht veröffentlicht haben:
channel.basic_publish(exchange='',
routing_key='hello',
body=message)
Der Austauschparameter ist der Name des Austauschs. Eine leere Zeichenfolge bedeutet einen standardmäßigen oder anonymen Austausch. Wenn vorhanden, wird die Nachricht weitergeleitet_Es wird mit dem durch den Schlüssel angegebenen Namen an die Warteschlange weitergeleitet.
Stattdessen können Sie an einer benannten Börse veröffentlichen:
channel.basic_publish(exchange='logs',
routing_key='',
body=message)
Zuvor haben Sie eine Warteschlange mit dem angegebenen Namen verwendet (erinnern Sie sich an hallo und task_queue?). Die Benennung der Warteschlange ist sehr wichtig. Die Mitarbeiter mussten auf dieselbe Warteschlange verweisen. Das Benennen einer Warteschlange ist wichtig, wenn Sie die Warteschlange zwischen Produzenten und Verbrauchern teilen möchten.
Dies ist bei unseren Loggern jedoch nicht der Fall. Ich möchte alles hören, nicht Teil der Protokollnachricht. Wir interessieren uns auch nur für die Nachrichten, die gerade fließen, nicht für die alten Nachrichten. Zwei Dinge sind erforderlich, um dies zu lösen.
Wenn Sie eine Verbindung zu Rabbit herstellen, benötigen Sie zunächst eine neue, leere Warteschlange. Zu diesem Zweck können Sie eine Warteschlange mit einem zufälligen Namen erstellen. Besser, der Server wählt einen zufälligen Warteschlangennamen für Sie aus. Sie können dies tun, indem Sie queue_declare ** keine Warteschlangenparameter ** geben:
result = channel.queue_declare()
Wobei result.method.queue einen zufälligen Warteschlangennamen enthält. Zum Beispiel "amq.gen-JzTY20BRgKO-HjmUJj0wLg".
Zweitens sollte die Warteschlange gelöscht werden, sobald der Verbraucher getrennt wird. Dafür gibt es eine exklusive Flagge:
result = channel.queue_declare(exclusive=True)
Bisher haben Sie einen Fan-Out-Austausch und eine Warteschlange erstellt. Als Nächstes müssen Sie den Austausch anweisen, eine Nachricht an die Warteschlange zu senden. Die Beziehung zwischen einem Austausch und einer Warteschlange wird als Bindung bezeichnet.
channel.queue_bind(exchange='logs',
queue=result.method.queue)
Der Austausch "Protokolle" fügt die Nachricht nun unserer Warteschlange hinzu.
Liste der Bindungen
Wie Sie sich vorstellen können, listet rabbitmqctl_Sie können Bindungen verwenden, um vorhandene Bindungen aufzulisten.
Das Produzentenprogramm, das die Protokollnachricht sendet, unterscheidet sich nicht wesentlich vom vorherigen Lernprogramm. Die wichtigste Änderung besteht darin, die Nachricht im Austausch "Protokolle" anstelle des anonymen Austauschs zu veröffentlichen. Sie müssen routing_key beim Senden angeben, dieser Wert wird jedoch ignoriert, da es sich um einen Fanout-Austausch handelt. Code für das Skript emit_log.py:
1 #!/usr/bin/env python
2 import pika
3 import sys
4
5 connection = pika.BlockingConnection(pika.ConnectionParameters(
6 host='localhost'))
7 channel = connection.channel()
8
9 channel.exchange_declare(exchange='logs',
10 type='fanout')
11
12 message = ' '.join(sys.argv[1:]) or "info: Hello World!"
13 channel.basic_publish(exchange='logs',
14 routing_key='',
15 body=message)
16 print " [x] Sent %r" % (message,)
17 connection.close()
Wie Sie sehen können, deklarieren Sie nach dem Herstellen der Verbindung den Austausch. Dieser Schritt ist erforderlich, da das Veröffentlichen an nicht vorhandenen Börsen verboten ist.
Wenn keine Warteschlange an den Austausch gebunden ist, geht die Nachricht verloren, aber das ist in Ordnung. Wenn kein Verbraucher es erhalten hat, kann die Nachricht sicher verworfen werden.
Code für receive_logs.py:
1 #!/usr/bin/env python
2 import pika
3
4 connection = pika.BlockingConnection(pika.ConnectionParameters(
5 host='localhost'))
6 channel = connection.channel()
7
8 channel.exchange_declare(exchange='logs',
9 type='fanout')
10
11 result = channel.queue_declare(exclusive=True)
12 queue_name = result.method.queue
13
14 channel.queue_bind(exchange='logs',
15 queue=queue_name)
16
17 print ' [*] Waiting for logs. To exit press CTRL+C'
18
19 def callback(ch, method, properties, body):
20 print " [x] %r" % (body,)
21
22 channel.basic_consume(callback,
23 queue=queue_name,
24 no_ack=True)
25
26 channel.start_consuming()
Das ist es. Wenn Sie das Protokoll in einer Datei speichern möchten, öffnen Sie die Konsole und geben Sie Folgendes ein:
$ python receive_logs.py > logs_from_rabbit.log
Wenn Sie die Bildschirmprotokolle anzeigen möchten, starten Sie ein neues Terminal und führen Sie Folgendes aus:
$ python receive_logs.py
Und um das Protokoll zu senden, geben Sie natürlich Folgendes ein:
$ python emit_log.py
Mit rabbitmqctl list_bindings können Sie überprüfen, ob Ihr Code tatsächlich die gewünschten Bindungen und Warteschlangen erstellt. Wenn ich die beiden Programme receive_logs.py ausführe, erhalte ich Folgendes:
$ sudo rabbitmqctl list_bindings
Listing bindings ...
logs exchange amq.gen-JzTY20BRgKO-HjmUJj0wLg queue []
logs exchange amq.gen-vso0PVvyiRIL2WoV3i48Yg queue []
...done.
Die Interpretation des Ergebnisses ist unkompliziert: Daten, die aus dem Austausch "Protokolle" stammen, werden in zwei Warteschlangen mit vom Server zugewiesenen Namen gestellt. Genau wie vorgesehen.
Fahren wir mit Tutorial 4 fort, um zu erfahren, wie Sie eine Teilmenge von Nachrichten empfangen.