[PYTHON] RabbitMQ Tutorial 5 (Topic)

RabbitMQ Tutorial 5 https://www.rabbitmq.com/tutorials/tutorial-five-python.html It is a translation of. We look forward to pointing out any translation errors.

Prerequisites

This tutorial assumes that RabbitMQ is installed and running on the standard port (5672) on the local host. If you want to use a different host, port, or credentials, you need to adjust your connection settings.

If a problem occurs

If you encounter any issues through this tutorial, you can contact us through the mailing list.

topic

(Using pika 0.9.8 Python client)

In the previous tutorial, we improved the logging system. Instead of a fanout exchange that can only broadcast a dummy, you can now use a direct exchange to selectively receive logs.

We've improved the system using direct exchanges, but there are still limitations. Direct exchange does not allow routing based on multiple criteria.

In our logging system, we want to subscribe not only based on severity, but also on the source that publishes the log. You may know this concept by unix syslog tool, it logs based on severity (information / warning / critical ...) and facility (auth / cron / kern ...). Route

This tool is very flexible, you may want to get all the logs coming from "kern", not just the serious errors coming from "cron".

To achieve this with our logging system, we need to learn about more complex topic exchanges.

Topic exchange

Messages sent to the topic exchange cannot have any routing_key, it must be a dot-separated list of words. The word is arbitrary, but usually specifies some characteristics related to the message. Examples of routing keys: "stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit". The routing key can contain any number of words, up to a limit of 255 bytes.

The binding key must be of the same format. The logic of a topic exchange is similar to that of a direct exchange: messages sent with a particular routing key are delivered to all queues bound by a matching binding key. However, there are two important special cases for binding keys:

\ * (Star) can replace just one word. \ # (Hash) can replace zero or more words.

It's easiest to explain with an example:

(Figure)

In this example, we will send a message that represents an animal. The message is sent with a routing key consisting of the following three words (and two dots): The first word of the routing key describes "agility", the second word describes "color", and the third word describes "seed": "(agility). (Color). (Seed)"

Create three bindings, "Q1" is combined with "\ *. Orange \ *" and "Q2" is combined with "\ *. \ *. Rabbit" and "lazy. #".

These bindings can be summarized as follows:

"Q1" is interested in all orange animals. "Q2" wants to hear everything about rabbits and everything about lazy animals.

Messages with the routing key set to "quick.orange.rabbit" will be delivered to both queues. The message "lazy.orange.elephant" also goes to both. On the other hand, "quick.orange.fox" goes only to the first queue and "lazy.brown.fox" goes only to the second. "lazy.pink.rabbit" matches the two bindings in the second queue, but is delivered only once to the second queue. "quick.brown.fox" does not match any binding and will be discarded.

What if you break your promise and send a message with a single word or four words, such as "orange" or "quick.orange.male.rabbit"? These messages are lost because they do not match any binding.

On the other hand, "lazy.orange.male.rabbit" has four words, but it matches the last binding, so it is delivered to the second queue.

Topic exchange

Topic exchanges are powerful and can behave like any other exchange.

The queue is the binding key "#When combined with (hash), like a fanout exchange, you will receive all messages regardless of the routing key.

special character"*"(Star),"#If no (hash) is used in the binding, the topic exchange behaves just like a direct exchange.

All summary

The logging system uses topic exchanges. Let's start assuming that the log routing key has two words: "(Facility). (Severity)".

The code is almost the same as the previous tutorial.

Code of emit_log_topic.py

 1    #!/usr/bin/env python
 2    import pika
 3    import sys
 4
 5    connection = pika.BlockingConnection(pika.ConnectionParameters(
 6            host='localhost'))
 7    channel = connection.channel()
 8
 9    channel.exchange_declare(exchange='topic_logs',
10                             type='topic')
11
12    routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
13    message = ' '.join(sys.argv[2:]) or 'Hello World!'
14    channel.basic_publish(exchange='topic_logs',
15                          routing_key=routing_key,
16                          body=message)
17    print " [x] Sent %r:%r" % (routing_key, message)
18    connection.close()

Code for receive_logs_topic.py:

 1    #!/usr/bin/env python
 2    import pika
 3    import sys
 4
 5    connection = pika.BlockingConnection(pika.ConnectionParameters(
 6            host='localhost'))
 7    channel = connection.channel()
 8
 9    channel.exchange_declare(exchange='topic_logs',
10                             type='topic')
11
12    result = channel.queue_declare(exclusive=True)
13    queue_name = result.method.queue
14
15    binding_keys = sys.argv[1:]
16    if not binding_keys:
17        print >> sys.stderr, "Usage: %s [binding_key]..." % (sys.argv[0],)
18        sys.exit(1)
19
20    for binding_key in binding_keys:
21        channel.queue_bind(exchange='topic_logs',
22                           queue=queue_name,
23                           routing_key=binding_key)
24
25    print ' [*] Waiting for logs. To exit press CTRL+C'
26
27    def callback(ch, method, properties, body):
28        print " [x] %r:%r" % (method.routing_key, body,)
29
30    channel.basic_consume(callback,
31                          queue=queue_name,
32                          no_ack=True)
33
34    channel.start_consuming()

To receive all logs:

python receive_logs_topic.py "#"

To receive all logs from facility "kern":

python receive_logs_topic.py "kern.*"

If you only want to hear the "critical" log:

python receive_logs_topic.py "*.critical"

You can generate multiple bindings:

python receive_logs_topic.py "kern.*" "*.critical"

To publish a log of the routing key "kern.critical":

python emit_log_topic.py "kern.critical" "A critical kernel error"

Play with this program. Note that the code does not make assumptions about routing or binding keys, it can be run with parameters for more than one routing key.

Some challenges:

Will the "\ *" binding catch messages sent with an empty routing key? Does "#. \ *" Capture messages keyed by the string ".."? Do you want to capture a single word key message? What is the difference between "a. \ *. #" And "a. #"?

Move on to Tutorial 6 to learn about RPC.

Recommended Posts

RabbitMQ Tutorial 5 (Topic)
RabbitMQ Tutorial 6 (RPC)
RabbitMQ Tutorial 4 (Routing)
RabbitMQ Tutorial 2 (Work Queue)
RabbitMQ Tutorial 3 (Publish / Subscribe)
RabbitMQ Tutorial 1 ("Hello World!")
sqlalchemy tutorial
PyODE Tutorial 2
Python tutorial
PyODE Tutorial 1
PyODE Tutorial 3
TensorFlow tutorial tutorial