Ich habe die Verwendung von Rabbit Mq in Letztes Mal und Letztes Mal zusammengefasst. Nachdem wir den Anwendungsbereich etwas erweitert und die Verwendung von Rabbit Mq unter dem Gesichtspunkt der Beziehungen zwischen Produzenten und Verbrauchern untersucht haben, werden wir die Vorteile anhand eines einfachen Beispiels zusammenfassen.
Die vom Produzenten gesendeten Nachrichten werden in einer Warteschlange gespeichert, und der Verbraucher empfängt sie in dieser Reihenfolge und führt den Prozess aus.
Wenn Sie einen Job ausführen, der nicht gleichzeitig auf einer Website ausgeführt werden kann, die von mehreren Personen verwendet wird.
Der Warteschlangenprozess endet, ohne auf den Prozess des Verbrauchers zu warten, sodass die Nachricht nur zweimal gesendet wird.
client_main.py
import pika
import datetime
pika_param = pika.ConnectionParameters('localhost')
connection = pika.BlockingConnection(pika_param)
channel = connection.channel()
channel.queue_declare(queue='hello')
print('Send Message 1 Start. {}'.format(datetime.datetime.now()))
channel.basic_publish(exchange='',
routing_key='hello',
body='Hello World 1')
print('Send Message 1 End. {}'.format(datetime.datetime.now()))
print('Send Message 2 Start. {}'.format(datetime.datetime.now()))
channel.basic_publish(exchange='',
routing_key='hello',
body='Hello World 2')
print('Send Message 2 End. {}'.format(datetime.datetime.now()))
Wenn Sie das Beispiel ausführen, wird die Zeit zum Senden und Empfangen angezeigt und endet. Wenn Sie sich auf dem Verwaltungsbildschirm die Warteschlangen ansehen, sehen Sie, dass die Nachricht zwei enthält.
PS C:\Users\xxxx\program\python\pika> python .\client_main.py
Send Message 1 Start. 2020-03-08 18:53:45.658027
Send Message 1 End. 2020-03-08 18:53:45.659027
Send Message 2 Start. 2020-03-08 18:53:45.659027
Send Message 2 End. 2020-03-08 18:53:45.660026
Der Empfänger bereitet einfach eine Funktion vor, die den Prozess aus der Warteschlange entnimmt und ausführt.
host_main.py
import datetime
import pika
pika_param = pika.ConnectionParameters(host='localhost', channel_max=2)
connection = pika.BlockingConnection(pika_param)
channel = connection.channel()
channel.queue_declare(queue='hello')
def callback(ch, method, properties, body):
print("{} Received. {}".format(body, datetime.datetime.now()))
ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_consume(
queue='hello', on_message_callback=callback)
channel.start_consuming()
Wenn das Beispiel ausgeführt wird, wird die Verarbeitung in derselben Reihenfolge ausgeführt, in der sie ordnungsgemäß gesendet wurde. Da wir die Nachricht verarbeitet haben, können Sie beim Betrachten von Warteschlangen auf dem Verwaltungsbildschirm sehen, dass die Nachricht 0 ist.
PS C:\Users\xxxx\program\python\pika> python .\host_main.py
b'Hello World 1' Received. 2020-03-08 19:02:11.756469
b'Hello World 2' Received. 2020-03-08 19:02:11.757469
Produzenten und Konsumenten können unabhängig voneinander arbeiten, sodass Sie die Anzahl der Produzenten und Konsumenten leicht erhöhen können. Darüber hinaus wird die Nachrichtenverteilung standardmäßig verteilt. Wenn Sie also mehrere Verbraucher hinzufügen, wird sie auf natürliche Weise verteilt.
Da die Anzahl der Systembenutzer zunimmt und die Rückverarbeitung unzureichend ist, werden wir die Anzahl der Verbraucher erhöhen, die sich damit befassen.
Ich möchte die Anzahl der Verbraucher erhöhen, während der Produzent läuft, also sende ich ungefähr 20 Mal eine Nachricht und erhöhe in der Zwischenzeit die Anzahl der Verbraucher.
import time
import datetime
import pika
pika_param = pika.ConnectionParameters('localhost')
connection = pika.BlockingConnection(pika_param)
channel = connection.channel()
channel.queue_declare(queue='hello')
for i in range(20):
channel.basic_publish(exchange='',
routing_key='hello',
body='Hello World {}'.format(i))
print('Send Message {} Exec. {}'.format(i, datetime.datetime.now()))
time.sleep(2)
connection.close()
Der Empfänger hat zwei Funktionen vorbereitet, um den Prozess aus der Warteschlange zu nehmen und auszuführen. Ich habe eine Nummer in die anzuzeigende Nachricht eingefügt, damit sie als jede Quelle identifiziert werden kann.
host1_main.py
import datetime
import pika
pika_param = pika.ConnectionParameters(host='localhost', channel_max=2)
connection = pika.BlockingConnection(pika_param)
channel = connection.channel()
channel.queue_declare(queue='hello')
def callback(ch, method, properties, body):
print("Host1 {} Received. {}".format(body, datetime.datetime.now()))
ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_consume(
queue='hello', on_message_callback=callback)
channel.start_consuming()
host2_main.py
import datetime
import pika
pika_param = pika.ConnectionParameters(host='localhost', channel_max=2)
connection = pika.BlockingConnection(pika_param)
channel = connection.channel()
channel.queue_declare(queue='hello')
def callback(ch, method, properties, body):
print("Host 2 {} Received. {}".format(body, datetime.datetime.now()))
ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_consume(
queue='hello', on_message_callback=callback)
channel.start_consuming()
Führen Sie bei der Ausführung die folgende Reihenfolge aus, damit die sendende Seite die empfangende Seite beim Start erhöht.
PS C:\Users\xxxx\program\python\pika> python .\client_main.py
Send Message 0 Exec. 2020-03-08 19:48:12.834261
Send Message 1 Exec. 2020-03-08 19:48:14.835843
Send Message 2 Exec. 2020-03-08 19:48:16.838815
Send Message 3 Exec. 2020-03-08 19:48:18.839815
Send Message 4 Exec. 2020-03-08 19:48:20.840815
Send Message 5 Exec. 2020-03-08 19:48:22.841815
Send Message 6 Exec. 2020-03-08 19:48:24.842788
Send Message 7 Exec. 2020-03-08 19:48:26.843861
Send Message 8 Exec. 2020-03-08 19:48:28.845190
Send Message 9 Exec. 2020-03-08 19:48:30.845934
PS C:\Users\xxxx\program\python\pika> python .\host1_main.py
Host1 b'Hello World 0' Received. 2020-03-08 19:48:12.836260
Host1 b'Hello World 1' Received. 2020-03-08 19:48:14.839838
Host1 b'Hello World 2' Received. 2020-03-08 19:48:16.841816
Host1 b'Hello World 3' Received. 2020-03-08 19:48:18.840818
Host1 b'Hello World 4' Received. 2020-03-08 19:48:20.842817
Host1 b'Hello World 6' Received. 2020-03-08 19:48:24.844791
Host1 b'Hello World 8' Received. 2020-03-08 19:48:28.847190
PS C:\Users\xxxx\program\python\pika> python .\host2_main.py
Host 2 b'Hello World 5' Received. 2020-03-08 19:48:22.843819
Host 2 b'Hello World 7' Received. 2020-03-08 19:48:26.845863
Host 2 b'Hello World 9' Received. 2020-03-08 19:48:30.847937
Wenn Sie sich das Ausführungsergebnis ansehen, sehen Sie, dass die Nachricht auch auf die Verbraucherseite gelangt, die zu dem Zeitpunkt zugenommen hat, als die Anzahl der Verbraucher erhöht wurde. Außerdem funktioniert die verteilte Verarbeitung von Nachrichten normal und die Nachrichten werden abwechselnd verarbeitet.
Entspricht dem einfachen Beispiel für das Hinzufügen eines Produzenten / Verbrauchers.
Entspricht dem einfachen Beispiel für das Hinzufügen eines Produzenten / Verbrauchers.
Die Clients von RabbitMq verfügen über Bibliotheken in verschiedenen Sprachen, sodass Sie Nachrichten austauschen können, ohne sich der Unterschiede zwischen den Sprachen bewusst zu sein. Darüber hinaus arbeiten nicht nur Hersteller und Verbraucher, sondern auch Hersteller und Verbraucher unabhängig voneinander, sodass die Sprachen nicht vereinheitlicht werden müssen.
Wenn Sie einen Dienst bereitstellen möchten, der in verschiedenen Sprachen als ein Dienst ausgeführt wird, und einen integrierten Dienst erstellen möchten, indem Sie den alten Dienst so wenig wie möglich verwenden.
Verwenden Sie den Python-Produzenten oben.
Verwenden Sie `host1_main.py
`, das oben als Python geschrieben wurde. Verwenden Sie die folgende Quelle als Kotlin als andere Sprache.
host_kotlin_main.kt
import com.rabbitmq.client.AMQP
import com.rabbitmq.client.ConnectionFactory
import com.rabbitmq.client.DefaultConsumer
import com.rabbitmq.client.Envelope
fun main(argv: Array<String>) {
val factory = ConnectionFactory()
factory.host = "localhost"
val connection = factory.newConnection()
val channel = connection.createChannel()
channel.queueDeclare("hello", false, false, false, null)
val callback = object : DefaultConsumer(channel) {
override fun handleDelivery(consumerTag: String, envelope: Envelope, properties: AMQP.BasicProperties, body: ByteArray) {
val message = String(body, charset("UTF-8"))
println("Host Kotlin '$message' Received")
channel.basicAck(envelope.deliveryTag, false)
}
}
channel.basicConsume("hello", false, callback)
}
Beim Laufen habe ich den Verbraucher und dann den Produzenten gestartet.
PS C:\Users\xxxx\program\python\pika> python .\client_main.py
Send Message 0 Exec. 2020-03-08 21:21:06.572582
Send Message 1 Exec. 2020-03-08 21:21:08.573535
Send Message 2 Exec. 2020-03-08 21:21:10.574442
Send Message 3 Exec. 2020-03-08 21:21:12.575198
PS C:\Users\xxxx\program\python\pika> python .\host1_main.py
Host1 b'Hello World 1' Received. 2020-03-08 21:21:08.575536
Host1 b'Hello World 3' Received. 2020-03-08 21:21:12.577199
"C:\Program Files\Java\bin\java.exe" .... Host_kotlin_mainKt
Host Kotlin 'Hello World 0' Received
Host Kotlin 'Hello World 2' Received
Wenn Sie sich das Ausführungsergebnis ansehen, sehen Sie, dass es normal funktioniert, unabhängig davon, ob der Consumer Python oder Kotlin (Java) ist.
Ich habe versucht, die bequeme Verwendung von Rabbit Mq zusammenzufassen. Der größte Teil des RabbitMq befand sich in der Warteschlange oder AMQP, und es gab fast keinen RabbitMq, der einzigartig war. Persönlich fand ich es einfach zu bedienen, weil es sich auf natürliche Weise verteilt und es sehr einfach ist, Produzenten und Verbraucher hinzuzufügen. Dieses Mal habe ich mich auf die Beziehung zwischen Produzenten und Konsumenten konzentriert, aber als ich mich auf die Warteschlange konzentrierte, gab es immer noch viele Funktionen. Ich denke, es wird viel Zeit brauchen, um alles zu erfassen.
Recommended Posts