RabbitMQ Tutorial 6 https://www.rabbitmq.com/tutorials/tutorial-six-python.html It is a translation of. We look forward to pointing out any translation errors.
This tutorial assumes that RabbitMQ is installed and running on the standard port (5672) on the local host. If you want to use a different host, port, or credentials, you need to adjust your connection settings.
If you encounter any issues through this tutorial, you can contact us through the mailing list.
In the second tutorial, you learned how to use work queues to distribute time-consuming tasks among multiple workers.
But what if you need to execute a function on a remote computer and wait for the result? That's another story. This pattern is commonly known as remote procedure call or RPC.
In this tutorial, we will use RabbitMQ to build an RPC system (client and scalable RPC server). We don't have a time-consuming task to supply values, so let's create a dummy RPC service that returns the Fibonacci number.
Create a simple client class to illustrate how the RPC service is used. This class exposes a method named "call" that sends an RPC request and blocks it until it receives an answer:
fibonacci_rpc = FibonacciRpcClient()
result = fibonacci_rpc.call(4)
print "fib(4) is %r" % (result,)
Notes on RPC
RPC is a fairly common pattern in computing, but it is often criticized. The problem occurs when the programmer does not know if the function call is local or if it is a slow RPC. Such confusion has the consequence of adding unnecessary complexity to unpredictable systems and debugging. Instead of simplifying the software, misuse of RPC results in unmaintainable spaghetti code.
In your mind, consider the following tips:
Make sure it is clear which function calls are local and which are remote.
Document your system. Clarify the dependencies between components.
Handle error cases. How should the client react when the RPC server is down for an extended period of time?
Avoid RPC if you have any doubts. If possible, you should use an asynchronous pipeline instead of a blocking method like RPC, the results will be pushed asynchronously to the next calculation stage.
In general, RPC with RabbitMQ is easy. When the client sends a request message, the server responds with a response message. To receive the response, the client must send the request the address of the "callback" queue. Let's try it:
result = channel.queue_declare(exclusive=True)
callback_queue = result.method.queue
channel.basic_publish(exchange='',
routing_key='rpc_queue',
properties=pika.BasicProperties(
reply_to = callback_queue,
),
body=request)
# ... and some code to read a response message from the callback_queue ...
Message properties
The AMQP protocol defines 14 properties that are carried with a message. Most properties are rarely used, with the following exceptions:
delivery_mode:Mark the message as permanent (value 2) or temporary (other values). We used this property in Tutorial 2.
content_type:Used to describe the MIME type of the encoding. For example, in the frequently used JSON encoding, this property is set to "application"./It is recommended to set it to "json".
reply_to:Usually used to name callback queues.
correlation_id:Useful for mapping RPC responses to requests.
In the method above, we have offered to generate a callback queue for all RPC requests. This is pretty inefficient, but fortunately there is a better way, let's generate one callback queue for each client.
This creates a new problem when a response is received in a queue and the request to which the response belongs is not clear. Therefore, use the correlation_id property. Set a unique value for every request. When a message is later received in the callback queue, the response can be associated with the request based on this property. If you have a value of correlation_id that you don't know, you can safely discard the message, it doesn't belong to our request.
I need to ignore unknown messages in the callback queue instead of failing with an error, why? This is due to a possible race condition on the server side. It is unlikely that the RPC server will die just after sending the answer and before sending the request acknowledgment message. In that case, the restarted RPC server will process the request again. That's why duplicate responses on the client should be handled gracefully, and the RPC should ideally be idempotent.
Our RPC works as follows:
When the client starts, it creates an anonymous, exclusive callback queue. For RPC requests, the client sends a message with two properties: reply_to with a callback queue and correlation_id with a unique value for each request. The request is sent to the "rpc_queue" queue. An RPC worker (commonly known as a server) is waiting for a request for that queue. When the request appears, it does the job and uses the queue specified in the reply_to field to send the results and message to the client. The client waits for data in the callback queue. When the message appears, check the correlation_id property. If it matches the value from the request, it returns a response to the application.
Code for rpc_server.py:
1 #!/usr/bin/env python
2 import pika
3
4 connection = pika.BlockingConnection(pika.ConnectionParameters(
5 host='localhost'))
6
7 channel = connection.channel()
8
9 channel.queue_declare(queue='rpc_queue')
10
11 def fib(n):
12 if n == 0:
13 return 0
14 elif n == 1:
15 return 1
16 else:
17 return fib(n-1) + fib(n-2)
18
19 def on_request(ch, method, props, body):
20 n = int(body)
21
22 print " [.] fib(%s)" % (n,)
23 response = fib(n)
24
25 ch.basic_publish(exchange='',
26 routing_key=props.reply_to,
27 properties=pika.BasicProperties(correlation_id = \
28 props.correlation_id),
29 body=str(response))
30 ch.basic_ack(delivery_tag = method.delivery_tag)
31
32 channel.basic_qos(prefetch_count=1)
33 channel.basic_consume(on_request, queue='rpc_queue')
34
35 print " [x] Awaiting RPC requests"
36 channel.start_consuming()
The server code is pretty simple:
(4) As always, start by establishing a connection and declaring a queue. (11) Declare the Fibonacci function. This only assumes a valid positive integer input. (Don't expect it to work for large numbers, this is probably the slowest recursive implementation of the possible implementations). (19) Declare the callback for basic_consume, the core of the RPC server. Executed when a request is received. Work and reply with a response. (32) You may want to run multiple server processes. You should set prefetch_count to evenly distribute the load across multiple servers.
Code for rpc_client.py:
1 #!/usr/bin/env python
2 import pika
3 import uuid
4
5 class FibonacciRpcClient(object):
6 def __init__(self):
7 self.connection = pika.BlockingConnection(pika.ConnectionParameters(
8 host='localhost'))
9
10 self.channel = self.connection.channel()
11
12 result = self.channel.queue_declare(exclusive=True)
13 self.callback_queue = result.method.queue
14
15 self.channel.basic_consume(self.on_response, no_ack=True,
16 queue=self.callback_queue)
17
18 def on_response(self, ch, method, props, body):
19 if self.corr_id == props.correlation_id:
20 self.response = body
21
22 def call(self, n):
23 self.response = None
24 self.corr_id = str(uuid.uuid4())
25 self.channel.basic_publish(exchange='',
26 routing_key='rpc_queue',
27 properties=pika.BasicProperties(
28 reply_to = self.callback_queue,
29 correlation_id = self.corr_id,
30 ),
31 body=str(n))
32 while self.response is None:
33 self.connection.process_data_events()
34 return int(self.response)
35
36 fibonacci_rpc = FibonacciRpcClient()
37
38 print " [x] Requesting fib(30)"
39 response = fibonacci_rpc.call(30)
40 print " [.] Got %r" % (response,)
The client code is a bit complicated:
(7) Establish a connection, channel and declare an exclusive "callback" queue for replies. (16) Subscribe to the "Callback" queue so that you can receive RPC responses. (18) The "on_response" callback that is executed for every response does a very simple task, for each response message it checks if the correlation_id is what we are looking for. If so, save the response in self.response and break the consumption loop. (23) Next, we define the main call method, where we make the actual RPC request. (24) Within this method, we first generate a unique correlation_id number and save it, the "on_response" callback function uses this value to capture the appropriate response. (25) Next, issue a request message with two properties ("reply_to" and "correlation_id"). (32) Now calm down and wait for the appropriate response to arrive. (33) And finally, it goes back and returns a response to the user.
The RPC service is ready. You can start the server:
$ python rpc_server.py
[x] Awaiting RPC requests
Run the client to request the Fibonacci number:
$ python rpc_client.py
[x] Requesting fib(30)
This design is not the only possible implementation of the RPC service, but it has some important advantages.
If your RPC server is very slow, you can scale it up with just one more run. Try running a second rpc_server.py in the new console. On the client side, the RPC needs to send and receive only one message. No synchronous calls like queue_declare are needed. As a result, the RPC client only needs one network orbit for each RPC request.
This code is still fairly simplified and does not solve more complex (but important) issues such as:
How should the client react if no server is running? Should the client have some timeout for the RPC? If the server fails and an exception occurs, do I need to forward it to the client? Protection from rogue incoming messages before processing (eg boundary checking).
If you try, rabbitmq to see the queue-The management plugin is valid.