[PYTHON] RabbitMQ Tutorial 2 (Arbeitswarteschlange)

Rabbit MQ Tutorial 2 https://www.rabbitmq.com/tutorials/tutorial-two-python.html Es ist eine Übersetzung von. Wir freuen uns darauf, auf Übersetzungsfehler hinzuweisen.

Voraussetzungen

In diesem Lernprogramm wird davon ausgegangen, dass RabbitMQ auf dem Standardport (5672) auf dem lokalen Host installiert ist und ausgeführt wird. Wenn Sie einen anderen Host, Port oder Anmeldeinformationen verwenden möchten, müssen Sie Ihre Verbindungseinstellungen anpassen.

Wenn ein Problem auftritt

Wenn Sie in diesem Tutorial auf Probleme stoßen, können Sie uns über die Mailingliste kontaktieren.

Arbeitswarteschlange

(Verwenden des Python-Clients pika 0.9.8)

Im ersten Tutorial habe ich ein Programm zum Senden und Empfangen von Nachrichten aus einer benannten Warteschlange geschrieben. Dadurch wird eine Arbeitswarteschlange erstellt, mit der zeitaufwändige Aufgaben auf mehrere Mitarbeiter verteilt werden.

Die Hauptidee hinter Arbeitswarteschlangen (allgemein als Aufgabenwarteschlangen bezeichnet) besteht darin, ressourcenintensive Aufgaben sofort auszuführen und nicht darauf zu warten, dass sie abgeschlossen werden. Planen Sie die Aufgabe stattdessen so, dass sie später ausgeführt wird. Kapseln Sie die Aufgabe als Nachricht und senden Sie sie an die Warteschlange. Ein im Hintergrund ausgeführter Arbeitsprozess ruft die Aufgabe ab und führt schließlich den Job aus. Wenn Sie viele Mitarbeiter ausführen, werden die Aufgaben unter ihnen geteilt.

Dieses Konzept ist besonders nützlich in Webanwendungen, in denen komplexe Aufgaben während einer kurzen HTTP-Anforderung nicht ausgeführt werden können.

Vorbereitung

Im vorherigen Teil dieses Tutorials haben Sie eine Nachricht mit "Hallo Welt!" Gesendet. Wir werden eine Zeichenfolge senden, die eine komplexe Aufgabe darstellt. Es gibt keine realen Aufgaben wie das Ändern der Größe von Bildern oder gerenderten PDF-Dateien. Lassen Sie uns dies also vortäuschen, indem Sie mit der Funktion time.sleep () vortäuschen, beschäftigt zu sein Tu es. Lassen Sie uns die Komplexität durch die Anzahl der Punkte in der Zeichenfolge zeigen. Nehmen wir an, dass jeder Punkt eine Sekunde "Arbeit" einnimmt. Zum Beispiel dauert eine gefälschte Aufgabe mit der Aufschrift "Hallo ..." 3 Sekunden.

Ändern Sie den send.py-Code aus dem vorherigen Beispiel geringfügig, sodass alle Nachrichten über die Befehlszeile gesendet werden. Dieses Programm plant Aufgaben in der Arbeitswarteschlange. Nennen wir sie also new_task.py:

import sys

message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange='',
                      routing_key='hello',
                      body=message)
print " [x] Sent %r" % (message,)

Das vorherige Skript "receive.py" erfordert auch einige Änderungen. Für jeden im Nachrichtentext enthaltenen Punkt ist ein gefälschter Ein-Sekunden-Job erforderlich. Rufen wir diese worker.py auf, weil sie die Nachricht aus der Warteschlange nimmt und die Aufgabe ausführt:

import time

def callback(ch, method, properties, body):
    print " [x] Received %r" % (body,)
    time.sleep( body.count('.') )
    print " [x] Done"

Round Robin Versand

Einer der Vorteile der Verwendung von Aufgabenwarteschlangen besteht darin, dass Sie Ihre Arbeit problemlos parallelisieren können. Wenn Sie einen Arbeitsstau aufbauen, können Sie weitere Mitarbeiter hinzufügen, sodass die Skalierung problemlos möglich ist.

Lassen Sie uns zunächst zwei worker.py-Skripte gleichzeitig ausführen. Sie erhalten Nachrichten aus beiden Warteschlangen, aber wie genau? Lass uns einen Blick darauf werfen.

Sie müssen drei Konsolen öffnen. Die beiden führen das Skript worker.py aus. Diese Konsolen sind zwei Verbraucher, "C1" und "C2".

shell1$ python worker.py
 [*] Waiting for messages. To exit press CTRL+C
shell2$ python worker.py
 [*] Waiting for messages. To exit press CTRL+C

Veröffentlichen Sie eine neue Aufgabe in der dritten Konsole. Nach dem Starten des Verbrauchers können Sie einige Nachrichten veröffentlichen.

shell3$ python new_task.py First message.
shell3$ python new_task.py Second message..
shell3$ python new_task.py Third message...
shell3$ python new_task.py Fourth message....
shell3$ python new_task.py Fifth message.....

Mal sehen, ob es an die Arbeiter geliefert wird:

shell1$ python worker.py
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'First message.'
 [x] Received 'Third message...'
 [x] Received 'Fifth message.....'
shell2$ python worker.py
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'Second message..'
 [x] Received 'Fourth message....'

Standardmäßig sendet RabbitMQ jede Nachricht an den nächsten Verbraucher in der Spalte. Im Durchschnitt erhalten alle Verbraucher die gleiche Anzahl von Nachrichten. Diese Methode zur Zustellung von Nachrichten wird als Round Robin bezeichnet. Versuchen Sie dies mit 3 oder mehr Arbeitern.

Nachrichtenbestätigung

Die Ausführung der Aufgabe kann einige Sekunden dauern. Sie fragen sich vielleicht, was passiert, wenn einer der Verbraucher eine lange Aufgabe startet, sie teilweise erledigt und stirbt. Im aktuellen Code übermittelt RabbitMQ die Nachricht einmal an den Kunden und entfernt sie sofort aus dem Speicher. In diesem Fall verlieren Sie die Nachricht, die verarbeitet wird, wenn Sie den Arbeiter töten. Sie verlieren auch alle Nachrichten, die an einen bestimmten Mitarbeiter gesendet, aber noch nicht verarbeitet wurden.

Aber ich möchte nicht alle Aufgaben verlieren. Wenn ein Arbeiter stirbt, möchte ich, dass die Aufgabe an andere Arbeiter übergeben wird.

Um sicherzustellen, dass Ihre Nachrichten niemals verloren gehen, unterstützt RabbitMQ Nachrichtenbestätigungen. Die Bestätigung (nowledgement) wird vom Verbraucher zurückgegeben, um RabbitMQ darüber zu informieren, dass eine bestimmte Nachricht empfangen und verarbeitet wurde, und RabbitMQ kann sie löschen.

Wenn ein Verbraucher stirbt, ohne eine Bestätigung zu senden, erkennt RabbitMQ, dass die Nachricht nicht vollständig verarbeitet wurde, und verteilt sie an einen anderen Verbraucher. Auf diese Weise können Sie sicher sein, dass Ihre Nachrichten nicht verloren gehen, wenn der Mitarbeiter gelegentlich stirbt.

Es gibt kein Nachrichtenzeitlimit. RabbitMQ verteilt die Nachricht nur dann neu, wenn die Worker-Verbindung unterbrochen wird. Die Verarbeitung der Nachricht kann sehr, sehr lange dauern.

Die Nachrichtenbestätigung ist standardmäßig aktiviert. Im vorherigen Beispiel wurde es explizit über das Flag no_ack = True deaktiviert. Entfernen Sie dieses Flag und lassen Sie den Mitarbeiter die entsprechende Bestätigung senden, wenn die Aufgabe abgeschlossen ist.

def callback(ch, method, properties, body):
    print " [x] Received %r" % (body,)
    time.sleep( body.count('.') )
    print " [x] Done"
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_consume(callback,
                      queue='hello')

Mit diesem Code können Sie beim Verarbeiten einer Nachricht STRG + C verwenden, um sicherzustellen, dass beim Töten eines Arbeiters nichts verloren geht. Unmittelbar nach dem Tod des Arbeiters werden alle unbeantworteten Nachrichten neu verteilt.

Vergessene Bestätigung

    basic_Das Vergessen zu bestätigen ist ein häufiger Fehler. Es ist ein einfacher Fehler, aber das Ergebnis ist schwerwiegend. Die Nachricht wird beim Beenden des Clients erneut zugestellt (obwohl dies möglicherweise wie eine zufällige erneute Zustellung aussieht), es ist jedoch nicht mehr möglich, nicht gepackte Nachrichten freizugeben, und RabbitMQ verwendet mehr Speicher Wird kommen, um zu tun.

Um diese Art von Fehler zu debuggen, Nachrichten_Sie können rabbitmqctl verwenden, um das nicht bestätigte Feld auszugeben:

    $ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
    Listing queues ...
    hello    0       0
    ...done.

Dauerhaftigkeit der Nachricht

Sie haben gelernt, wie Sie sicherstellen können, dass Ihre Aufgaben nicht verloren gehen, wenn der Verbraucher stirbt. Selbst wenn der RabbitMQ-Server ausfällt, geht die Arbeit dennoch verloren.

Wenn RabbitMQ beendet wird oder abstürzt, vergessen Sie Ihre Warteschlangen und Nachrichten, es sei denn, Sie weisen sie an, dies nicht zu tun. Zwei Dinge sind erforderlich, um sicherzustellen, dass Nachrichten nicht verloren gehen: Sowohl die Warteschlange als auch die Nachricht sollten als dauerhaft markiert werden.

Zunächst müssen Sie verhindern, dass RabbitMQ die Warteschlange verliert. Daher muss es als * dauerhaft * deklariert werden:

channel.queue_declare(queue='hello', durable=True)

Dieser Befehl ist an sich korrekt, funktioniert aber in unserem Setup nicht. Dies liegt daran, dass die Warteschlange mit dem Namen "Hallo" bereits als nicht dauerhaft definiert ist. RabbitMQ kann eine vorhandene Warteschlange mit anderen Parametern nicht neu definieren und gibt einen Fehler an jedes Programm zurück, das dies versucht. Es gibt jedoch eine schnelle Problemumgehung. Definieren Sie die Warteschlange mit einem anderen Namen, z. B. task_queue:

channel.queue_declare(queue='task_queue', durable=True)

Diese Änderung von queue_declare sollte sowohl auf den Produzenten- als auch auf den Konsumentencode angewendet werden.

Jetzt können Sie sehen, dass die Task_queue-Warteschlange beim Neustart von RabbitMQ nicht verloren geht. Als nächstes markieren wir die Nachricht als persistent, indem wir delivery_mode den Wert 2 geben.

channel.basic_publish(exchange='',
                      routing_key="task_queue",
                      body=message,
                      properties=pika.BasicProperties(
                         delivery_mode = 2, # make message persistent
                      ))
Hinweise zur Nachrichtenpersistenz

Das Markieren einer Nachricht als dauerhaft garantiert nicht, dass die Nachricht niemals verloren geht. Es weist RabbitMQ an, die Nachricht auf der Festplatte zu speichern, aber es gibt immer noch eine kurze Zeitverzögerung zwischen der Annahme der Nachricht durch RabbitMQ und der noch nicht gespeicherten Nachricht. RabbitMQ fsyncs auch für jede Nachricht(2)Wird nicht ausgeführt, wird möglicherweise nur zwischengespeichert und nicht tatsächlich auf die Festplatte geschrieben. Die Persistenzgarantie ist nicht stark, aber für eine einfache Aufgabenwarteschlange mehr als ausreichend. Wenn Sie eine starke Garantie benötigen, können Sie die Bestätigung des Herausgebers verwenden.

Fairer Versand

Möglicherweise haben Sie bemerkt, dass der Versand immer noch nicht wie gewünscht funktioniert. Wenn Sie beispielsweise zwei Mitarbeiter haben, sind alle ungeraden Nachrichten schwer und gerade Nachrichten leicht, dann ist ein Mitarbeiter immer beschäftigt und der andere erledigt wenig Arbeit. RabbitMQ weiß nichts darüber und versendet weiterhin gleichmäßig Nachrichten.

Dies geschieht, weil RabbitMQ die Nachricht einfach in der Warteschlange versendet. Ich habe die Anzahl der nicht bestätigten Nachrichten für Verbraucher nicht gesehen. Senden Sie einfach blind alle n-ten Nachrichten an den n-ten Verbraucher.

Um dies zu vermeiden, können Sie die Methode basic.qos mit der Einstellung prefetch_count = 1 verwenden. Dies weist RabbitMQ an, einem Mitarbeiter nicht mehrere Nachrichten gleichzeitig zu senden. Mit anderen Worten, es wird keine neue Nachricht an den Worker gesendet, bis der Worker die vorherige Nachricht verarbeitet und bestätigt hat. Senden Sie es stattdessen an den nächsten Mitarbeiter, der noch nicht beschäftigt ist.

channel.basic_qos(prefetch_count=1)
Hinweis zur Warteschlangengröße

Die Warteschlange kann voll sein, wenn alle Mitarbeiter verwendet werden. In solchen Fällen müssen Sie mehr Mitarbeiter hinzufügen oder andere Strategien anwenden.

Alle Zusammenfassung

Endgültiger Code für das Skript new_task.py:

 1    #!/usr/bin/env python
 2    import pika
 3    import sys
 4
 5    connection = pika.BlockingConnection(pika.ConnectionParameters(
 6            host='localhost'))
 7    channel = connection.channel()
 8
 9    channel.queue_declare(queue='task_queue', durable=True)
10
11    message = ' '.join(sys.argv[1:]) or "Hello World!"
12    channel.basic_publish(exchange='',
13                          routing_key='task_queue',
14                          body=message,
15                          properties=pika.BasicProperties(
16                             delivery_mode = 2, # make message persistent
17                          ))
18    print " [x] Sent %r" % (message,)
19    connection.close()

Und Arbeiter:

 1    #!/usr/bin/env python
 2    import pika
 3    import time
 4
 5    connection = pika.BlockingConnection(pika.ConnectionParameters(
 6            host='localhost'))
 7    channel = connection.channel()
 8
 9    channel.queue_declare(queue='task_queue', durable=True)
10    print ' [*] Waiting for messages. To exit press CTRL+C'
11
12    def callback(ch, method, properties, body):
13        print " [x] Received %r" % (body,)
14        time.sleep( body.count('.') )
15        print " [x] Done"
16        ch.basic_ack(delivery_tag = method.delivery_tag)
17
18    channel.basic_qos(prefetch_count=1)
19    channel.basic_consume(callback,
20                          queue='task_queue')
21
22    channel.start_consuming()

Sie können die Nachrichtenbestätigung und prefetch_count verwenden, um eine Arbeitswarteschlange einzurichten. Die Haltbarkeitsoption stellt sicher, dass die Aufgabe auch dann überlebt, wenn der Rabbit MQ neu gestartet wird.

Jetzt können Sie mit Tutorial 3 fortfahren und lernen, wie Sie vielen Verbrauchern dieselbe Nachricht übermitteln.

Recommended Posts

RabbitMQ Tutorial 2 (Arbeitswarteschlange)
RabbitMQ Tutorial 5 (Thema)
RabbitMQ Tutorial 6 (RPC)
RabbitMQ Tutorial 4 (Routing)
RabbitMQ Tutorial 3 (Veröffentlichen / Abonnieren)
RabbitMQ Tutorial 1 ("Hallo Welt!")
Warteschlange