I summarized how to use RabbitMq in previous time and previous time. Now that I've expanded the scope a bit further and investigated how to use RabbitMq from the relationship between producers and consumers, I'll summarize the benefits with a simple example.
--The messages in the queue can be accumulated and executed sequentially. --Easy to add producers and consumers --Producers and consumers can be added at any time --Message distribution can be distributed
The messages sent from the producer are queued, and the consumer receives them in that order and executes the process.
When running a job that cannot be executed at the same time on a website used by multiple people.
The process of queuing ends without waiting for the consumer's processing, so it only sends the message twice.
client_main.py
import pika
import datetime
pika_param = pika.ConnectionParameters('localhost')
connection = pika.BlockingConnection(pika_param)
channel = connection.channel()
channel.queue_declare(queue='hello')
print('Send Message 1 Start. {}'.format(datetime.datetime.now()))
channel.basic_publish(exchange='',
routing_key='hello',
body='Hello World 1')
print('Send Message 1 End. {}'.format(datetime.datetime.now()))
print('Send Message 2 Start. {}'.format(datetime.datetime.now()))
channel.basic_publish(exchange='',
routing_key='hello',
body='Hello World 2')
print('Send Message 2 End. {}'.format(datetime.datetime.now()))
When you run the sample, the time for sending and receiving is displayed and it ends. If you look at Queues on the administration screen, you can see that there are two in Message.
PS C:\Users\xxxx\program\python\pika> python .\client_main.py
Send Message 1 Start. 2020-03-08 18:53:45.658027
Send Message 1 End. 2020-03-08 18:53:45.659027
Send Message 2 Start. 2020-03-08 18:53:45.659027
Send Message 2 End. 2020-03-08 18:53:45.660026
The receiver simply prepares a function that takes the process from the queue and executes it.
host_main.py
import datetime
import pika
pika_param = pika.ConnectionParameters(host='localhost', channel_max=2)
connection = pika.BlockingConnection(pika_param)
channel = connection.channel()
channel.queue_declare(queue='hello')
def callback(ch, method, properties, body):
print("{} Received. {}".format(body, datetime.datetime.now()))
ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_consume(
queue='hello', on_message_callback=callback)
channel.start_consuming()
When the sample is executed, the processing is performed in the same order as when it was sent properly. Since we processed the message, if you look at Queues on the management screen, you can see that Message is 0.
PS C:\Users\xxxx\program\python\pika> python .\host_main.py
b'Hello World 1' Received. 2020-03-08 19:02:11.756469
b'Hello World 2' Received. 2020-03-08 19:02:11.757469
Since producers and consumers can run independently, it is easy to increase the number of producers and consumers. In addition, message distribution is distributed by default, so adding multiple consumers will naturally distribute it.
Since the number of system users is increasing and back processing is insufficient, we will increase the number of consumers to deal with it.
I want to increase the number of consumers while the producer is running, so I send a message about 20 times and increase the number of consumers in the meantime.
import time
import datetime
import pika
pika_param = pika.ConnectionParameters('localhost')
connection = pika.BlockingConnection(pika_param)
channel = connection.channel()
channel.queue_declare(queue='hello')
for i in range(20):
channel.basic_publish(exchange='',
routing_key='hello',
body='Hello World {}'.format(i))
print('Send Message {} Exec. {}'.format(i, datetime.datetime.now()))
time.sleep(2)
connection.close()
The receiver has prepared two functions that take the process from the queue and execute it. I put a number in the message to be displayed so that it can be identified as each source.
host1_main.py
import datetime
import pika
pika_param = pika.ConnectionParameters(host='localhost', channel_max=2)
connection = pika.BlockingConnection(pika_param)
channel = connection.channel()
channel.queue_declare(queue='hello')
def callback(ch, method, properties, body):
print("Host1 {} Received. {}".format(body, datetime.datetime.now()))
ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_consume(
queue='hello', on_message_callback=callback)
channel.start_consuming()
host2_main.py
import datetime
import pika
pika_param = pika.ConnectionParameters(host='localhost', channel_max=2)
connection = pika.BlockingConnection(pika_param)
channel = connection.channel()
channel.queue_declare(queue='hello')
def callback(ch, method, properties, body):
print("Host 2 {} Received. {}".format(body, datetime.datetime.now()))
ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_consume(
queue='hello', on_message_callback=callback)
channel.start_consuming()
When executing, execute in the following order so that the sending side increases the receiving side during startup.
PS C:\Users\xxxx\program\python\pika> python .\client_main.py
Send Message 0 Exec. 2020-03-08 19:48:12.834261
Send Message 1 Exec. 2020-03-08 19:48:14.835843
Send Message 2 Exec. 2020-03-08 19:48:16.838815
Send Message 3 Exec. 2020-03-08 19:48:18.839815
Send Message 4 Exec. 2020-03-08 19:48:20.840815
Send Message 5 Exec. 2020-03-08 19:48:22.841815
Send Message 6 Exec. 2020-03-08 19:48:24.842788
Send Message 7 Exec. 2020-03-08 19:48:26.843861
Send Message 8 Exec. 2020-03-08 19:48:28.845190
Send Message 9 Exec. 2020-03-08 19:48:30.845934
PS C:\Users\xxxx\program\python\pika> python .\host1_main.py
Host1 b'Hello World 0' Received. 2020-03-08 19:48:12.836260
Host1 b'Hello World 1' Received. 2020-03-08 19:48:14.839838
Host1 b'Hello World 2' Received. 2020-03-08 19:48:16.841816
Host1 b'Hello World 3' Received. 2020-03-08 19:48:18.840818
Host1 b'Hello World 4' Received. 2020-03-08 19:48:20.842817
Host1 b'Hello World 6' Received. 2020-03-08 19:48:24.844791
Host1 b'Hello World 8' Received. 2020-03-08 19:48:28.847190
PS C:\Users\xxxx\program\python\pika> python .\host2_main.py
Host 2 b'Hello World 5' Received. 2020-03-08 19:48:22.843819
Host 2 b'Hello World 7' Received. 2020-03-08 19:48:26.845863
Host 2 b'Hello World 9' Received. 2020-03-08 19:48:30.847937
Looking at the execution result, you can see that the message is coming to the consumer side that increased at the timing when the consumer was increased. In addition, the distributed processing of messages is working normally, and the messages are processed alternately.
Same as the easy example of adding producers and consumers.
Same as the easy example of adding producers and consumers.
RabbitMq clients have libraries in various languages, so you can exchange messages without being aware of the differences between languages. In addition, not only producers and consumers, but also producers and consumers operate independently, so there is no need to unify the languages.
When you want to provide a service that runs in various languages as one service, and you want to create an integrated service by using the old service as little as possible.
Use the python producer above.
Use `host1_main.py`
written above as python. Use the following source as Kotlin as another language.
host_kotlin_main.kt
import com.rabbitmq.client.AMQP
import com.rabbitmq.client.ConnectionFactory
import com.rabbitmq.client.DefaultConsumer
import com.rabbitmq.client.Envelope
fun main(argv: Array<String>) {
val factory = ConnectionFactory()
factory.host = "localhost"
val connection = factory.newConnection()
val channel = connection.createChannel()
channel.queueDeclare("hello", false, false, false, null)
val callback = object : DefaultConsumer(channel) {
override fun handleDelivery(consumerTag: String, envelope: Envelope, properties: AMQP.BasicProperties, body: ByteArray) {
val message = String(body, charset("UTF-8"))
println("Host Kotlin '$message' Received")
channel.basicAck(envelope.deliveryTag, false)
}
}
channel.basicConsume("hello", false, callback)
}
When I ran it, I started the consumer and then the producer.
PS C:\Users\xxxx\program\python\pika> python .\client_main.py
Send Message 0 Exec. 2020-03-08 21:21:06.572582
Send Message 1 Exec. 2020-03-08 21:21:08.573535
Send Message 2 Exec. 2020-03-08 21:21:10.574442
Send Message 3 Exec. 2020-03-08 21:21:12.575198
PS C:\Users\xxxx\program\python\pika> python .\host1_main.py
Host1 b'Hello World 1' Received. 2020-03-08 21:21:08.575536
Host1 b'Hello World 3' Received. 2020-03-08 21:21:12.577199
"C:\Program Files\Java\bin\java.exe" .... Host_kotlin_mainKt
Host Kotlin 'Hello World 0' Received
Host Kotlin 'Hello World 2' Received
If you look at the execution result, you can see that it works normally regardless of whether the consumer is python or Kotlin (Java).
I tried to summarize the convenient usage of RabbitMq. Most of the RabbitMq was queuing or AMQP, and there was almost no RabbitMq original. Personally, I found it easy to use because it naturally disperses and it is very easy to add producers and consumers. This time, I focused on the relationship between producers and consumers, but when I focused on the queue, there were still many functions. I think it will take a lot of time to grasp everything.
Recommended Posts