[PYTHON] RabbitMQ Tutorial 4 (Routing)

RabbitMQ Tutorial 4 https://www.rabbitmq.com/tutorials/tutorial-four-python.html It is a translation of. We look forward to pointing out any translation errors.

Prerequisites

This tutorial assumes that RabbitMQ is installed and running on the standard port (5672) on the local host. If you want to use a different host, port, or credentials, you need to adjust your connection settings.

If a problem occurs

If you encounter any issues through this tutorial, you can contact us through the mailing list.

routing

(Using pika 0.9.8 Python client)

In the previous tutorial, we built a simple logging system. I was able to broadcast the log message to many receivers.

This tutorial will add functionality to it, which will allow you to subscribe to only part of the message. For example, you can output only fatal error messages (to save disk space) to a log file and continue to output all messages to the console.

binding

In the previous example, you have already generated the binding. You will recall the code below:

channel.queue_bind(exchange=exchange_name,
                   queue=queue_name)

A binding is the relationship between an exchange and a queue. It can be read that this queue is interested in the messages from this exchange.

The binding can take an additional routing_key parameter. To avoid confusion with the parameters in basic_publish, we'll call it the binding key. Bindings with keys can be generated as follows:

channel.queue_bind(exchange=exchange_name,
                   queue=queue_name,
                   routing_key='black')

The meaning of the binding key depends on the exchange type. Previously used fanout exchanges simply ignored that value.

Direct exchange

The logging system from the previous tutorial broadcasts all messages to all consumers. I would like to extend this so that I can filter messages based on severity. For example, you can write only the fatal error log messages you receive to disk so that you do not waste disk space with warning or information-level log messages.

The fanout exchange I've used so far isn't very flexible, it can only broadcast blindly.

Instead, use the direct exchange. The direct exchange routing algorithm is simple: the message goes to a queue with a binding key that exactly matches the routing key.

For the sake of explanation, consider the following settings:

(Figure)

In this setting, there is a direct exchange "X" and two queues combined with it. The first queue has two bindings with the "orange" binding key and the second queue has two bindings, one with the "black" binding key and the other with the "green" binding key. ..

With such a configuration, the publish to the exchange of messages with the routing key "orange" is routed to the "Q1" queue. Messages with a routing key of "black" or "green" go to "Q2". All other messages will be discarded.

Multiple bindings

It's perfectly fine to combine multiple queues with the same binding key. In this example, you can add a "black" key join between "X" and "Q1". In that case, the direct exchange behaves like a fanout, broadcasting the message to all matching queues. Messages with the routing key "black" will be delivered to both "Q1" and "Q2".

Issuing logs

Use this model for your logging system. Send a message to Direct Exchange instead of fanout. Gives the log severity as a routing key. By doing so, the receiving script can select the severity it wants to receive. Let's focus on publishing logs first.

As always, you need to generate the exchange first:

channel.exchange_declare(exchange='direct_logs',
                         type='direct')

And you're ready to send a message:

channel.basic_publish(exchange='direct_logs',
                      routing_key=severity,
                      body=message)

For simplicity, the severity is either "info", "warning", or "error".

Subscribe

Receiving a message works the same as in the previous tutorial, with one exception, generating a new binding for each severity of interest.

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

for severity in severities:
    channel.queue_bind(exchange='direct_logs',
                       queue=queue_name,
                       routing_key=severity)

All summary

Code of emit_log_direct.py:

 1    #!/usr/bin/env python
 2    import pika
 3    import sys
 4
 5    connection = pika.BlockingConnection(pika.ConnectionParameters(
 6            host='localhost'))
 7    channel = connection.channel()
 8
 9    channel.exchange_declare(exchange='direct_logs',
10                             type='direct')
11
12    severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
13    message = ' '.join(sys.argv[2:]) or 'Hello World!'
14    channel.basic_publish(exchange='direct_logs',
15                          routing_key=severity,
16                          body=message)
17    print " [x] Sent %r:%r" % (severity, message)
18    connection.close()

Code for receive_logs_direct.py:

 1    #!/usr/bin/env python
 2    import pika
 3    import sys
 4
 5    connection = pika.BlockingConnection(pika.ConnectionParameters(
 6            host='localhost'))
 7    channel = connection.channel()
 8
 9    channel.exchange_declare(exchange='direct_logs',
10                             type='direct')
11
12    result = channel.queue_declare(exclusive=True)
13    queue_name = result.method.queue
14
15    severities = sys.argv[1:]
16    if not severities:
17        print >> sys.stderr, "Usage: %s [info] [warning] [error]" % \
18                             (sys.argv[0],)
19        sys.exit(1)
20
21    for severity in severities:
22        channel.queue_bind(exchange='direct_logs',
23                           queue=queue_name,
24                           routing_key=severity)
25
26    print ' [*] Waiting for logs. To exit press CTRL+C'
27
28    def callback(ch, method, properties, body):
29        print " [x] %r:%r" % (method.routing_key, body,)
30
31    channel.basic_consume(callback,
32                          queue=queue_name,
33                          no_ack=True)
34
35    channel.start_consuming()

If you want to save only the "wraning" and "error" (excluding "info") log messages to a file, open a console and type:

$ python receive_logs_direct.py warning error > logs_from_rabbit.log

If you want to see all log messages on screen, open a new terminal and run:

$ python receive_logs_direct.py info warning error
 [*] Waiting for logs. To exit press CTRL+C

And, for example, to issue an "error" log message, simply type:

$ python emit_log_direct.py error "Run. Run. Or it will explode."
 [x] Sent 'error':'Run. Run. Or it will explode.'

Let's move on to Tutorial 5 to learn how to receive messages based on patterns.

Recommended Posts

RabbitMQ Tutorial 4 (Routing)
RabbitMQ Tutorial 6 (RPC)
RabbitMQ Tutorial 2 (Work Queue)
RabbitMQ Tutorial 3 (Publish / Subscribe)
RabbitMQ Tutorial 1 ("Hello World!")
sqlalchemy tutorial
PyODE Tutorial 2
Python tutorial
PyODE Tutorial 1
PyODE Tutorial 3
TensorFlow tutorial tutorial