[PYTHON] Benefits and examples of using RabbitMq

Introduction

I summarized how to use RabbitMq in previous time and previous time. Now that I've expanded the scope a bit further and investigated how to use RabbitMq from the relationship between producers and consumers, I'll summarize the benefits with a simple example.

environment

Benefits of using RabbitMq

--The messages in the queue can be accumulated and executed sequentially. --Easy to add producers and consumers --Producers and consumers can be added at any time --Message distribution can be distributed

Queue messages can be accumulated and executed sequentially

The messages sent from the producer are queued, and the consumer receives them in that order and executes the process.

Situation example

When running a job that cannot be executed at the same time on a website used by multiple people.

producer

Sample source

The process of queuing ends without waiting for the consumer's processing, so it only sends the message twice.

client_main.py



import pika
import datetime
    
pika_param = pika.ConnectionParameters('localhost')
connection = pika.BlockingConnection(pika_param)
channel = connection.channel()

channel.queue_declare(queue='hello')

print('Send Message 1 Start. {}'.format(datetime.datetime.now()))
channel.basic_publish(exchange='',
                      routing_key='hello',
                      body='Hello World 1')
print('Send Message 1 End. {}'.format(datetime.datetime.now()))

print('Send Message 2 Start. {}'.format(datetime.datetime.now()))
channel.basic_publish(exchange='',
                      routing_key='hello',
                      body='Hello World 2')
print('Send Message 2 End. {}'.format(datetime.datetime.now()))

Execution result

When you run the sample, the time for sending and receiving is displayed and it ends. If you look at Queues on the administration screen, you can see that there are two in Message.


PS C:\Users\xxxx\program\python\pika> python .\client_main.py
Send Message 1 Start. 2020-03-08 18:53:45.658027
Send Message 1 End. 2020-03-08 18:53:45.659027
Send Message 2 Start. 2020-03-08 18:53:45.659027
Send Message 2 End. 2020-03-08 18:53:45.660026

queue_single.png

Consumer

Sample source

The receiver simply prepares a function that takes the process from the queue and executes it.

host_main.py



import datetime
import pika

pika_param = pika.ConnectionParameters(host='localhost', channel_max=2)
connection = pika.BlockingConnection(pika_param)
channel = connection.channel()

channel.queue_declare(queue='hello')

def callback(ch, method, properties, body):
    print("{} Received. {}".format(body, datetime.datetime.now()))
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_consume(
    queue='hello', on_message_callback=callback)

channel.start_consuming()

Execution result

When the sample is executed, the processing is performed in the same order as when it was sent properly. Since we processed the message, if you look at Queues on the management screen, you can see that Message is 0.


PS C:\Users\xxxx\program\python\pika> python .\host_main.py
b'Hello World 1' Received. 2020-03-08 19:02:11.756469
b'Hello World 2' Received. 2020-03-08 19:02:11.757469

queue_single_2.png

Easy to add producers and consumers

Since producers and consumers can run independently, it is easy to increase the number of producers and consumers. In addition, message distribution is distributed by default, so adding multiple consumers will naturally distribute it.

Situation example

Since the number of system users is increasing and back processing is insufficient, we will increase the number of consumers to deal with it.

producer

Sample source

I want to increase the number of consumers while the producer is running, so I send a message about 20 times and increase the number of consumers in the meantime.


import time
import datetime
import pika
    
pika_param = pika.ConnectionParameters('localhost')
connection = pika.BlockingConnection(pika_param)
channel = connection.channel()

channel.queue_declare(queue='hello')

for i in range(20):
    channel.basic_publish(exchange='',
                          routing_key='hello',
                          body='Hello World {}'.format(i))
    print('Send Message {} Exec. {}'.format(i, datetime.datetime.now()))
    time.sleep(2)

connection.close()

Consumer

Sample source

The receiver has prepared two functions that take the process from the queue and execute it. I put a number in the message to be displayed so that it can be identified as each source.

host1_main.py



import datetime
import pika

pika_param = pika.ConnectionParameters(host='localhost', channel_max=2)
connection = pika.BlockingConnection(pika_param)
channel = connection.channel()

channel.queue_declare(queue='hello')

def callback(ch, method, properties, body):
    print("Host1 {} Received. {}".format(body, datetime.datetime.now()))
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_consume(
    queue='hello', on_message_callback=callback)

channel.start_consuming()

host2_main.py



import datetime
import pika

pika_param = pika.ConnectionParameters(host='localhost', channel_max=2)
connection = pika.BlockingConnection(pika_param)
channel = connection.channel()

channel.queue_declare(queue='hello')

def callback(ch, method, properties, body):
    print("Host 2 {} Received. {}".format(body, datetime.datetime.now()))
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_consume(
    queue='hello', on_message_callback=callback)

channel.start_consuming()

Execution result

When executing, execute in the following order so that the sending side increases the receiving side during startup.

  1. Run host1_main.py
  2. Run client_main.py
  3. host1_main.py receives some messages
  4. Run host2_main.py

View client_main.py


PS C:\Users\xxxx\program\python\pika> python .\client_main.py
Send Message 0 Exec. 2020-03-08 19:48:12.834261
Send Message 1 Exec. 2020-03-08 19:48:14.835843
Send Message 2 Exec. 2020-03-08 19:48:16.838815
Send Message 3 Exec. 2020-03-08 19:48:18.839815
Send Message 4 Exec. 2020-03-08 19:48:20.840815
Send Message 5 Exec. 2020-03-08 19:48:22.841815
Send Message 6 Exec. 2020-03-08 19:48:24.842788
Send Message 7 Exec. 2020-03-08 19:48:26.843861
Send Message 8 Exec. 2020-03-08 19:48:28.845190
Send Message 9 Exec. 2020-03-08 19:48:30.845934

Display of host1_main.py


PS C:\Users\xxxx\program\python\pika> python .\host1_main.py
Host1 b'Hello World 0' Received. 2020-03-08 19:48:12.836260
Host1 b'Hello World 1' Received. 2020-03-08 19:48:14.839838
Host1 b'Hello World 2' Received. 2020-03-08 19:48:16.841816
Host1 b'Hello World 3' Received. 2020-03-08 19:48:18.840818
Host1 b'Hello World 4' Received. 2020-03-08 19:48:20.842817
Host1 b'Hello World 6' Received. 2020-03-08 19:48:24.844791
Host1 b'Hello World 8' Received. 2020-03-08 19:48:28.847190

View host2_main.py


PS C:\Users\xxxx\program\python\pika> python .\host2_main.py
Host 2 b'Hello World 5' Received. 2020-03-08 19:48:22.843819
Host 2 b'Hello World 7' Received. 2020-03-08 19:48:26.845863
Host 2 b'Hello World 9' Received. 2020-03-08 19:48:30.847937

Looking at the execution result, you can see that the message is coming to the consumer side that increased at the timing when the consumer was increased. In addition, the distributed processing of messages is working normally, and the messages are processed alternately.

Producers and consumers can be added at any time

Same as the easy example of adding producers and consumers.

Message distribution can be distributed

Same as the easy example of adding producers and consumers.

Messages can be exchanged between different languages

RabbitMq clients have libraries in various languages, so you can exchange messages without being aware of the differences between languages. In addition, not only producers and consumers, but also producers and consumers operate independently, so there is no need to unify the languages.

Situation example

When you want to provide a service that runs in various languages as one service, and you want to create an integrated service by using the old service as little as possible.

producer

Sample source

Use the python producer above.

Consumer

Sample source

Use `host1_main.py` written above as python. Use the following source as Kotlin as another language.

host_kotlin_main.kt



import com.rabbitmq.client.AMQP
import com.rabbitmq.client.ConnectionFactory
import com.rabbitmq.client.DefaultConsumer
import com.rabbitmq.client.Envelope

fun main(argv: Array<String>) {
    val factory = ConnectionFactory()
    factory.host = "localhost"
    val connection = factory.newConnection()
    val channel = connection.createChannel()

    channel.queueDeclare("hello", false, false, false, null)

    val callback = object : DefaultConsumer(channel) {
        override fun handleDelivery(consumerTag: String, envelope: Envelope, properties: AMQP.BasicProperties, body: ByteArray) {
            val message = String(body, charset("UTF-8"))
            println("Host Kotlin '$message' Received")
            channel.basicAck(envelope.deliveryTag, false)
        }
    }
    channel.basicConsume("hello", false, callback)
}

Execution result

When I ran it, I started the consumer and then the producer.

View client_main.py


PS C:\Users\xxxx\program\python\pika> python .\client_main.py
Send Message 0 Exec. 2020-03-08 21:21:06.572582
Send Message 1 Exec. 2020-03-08 21:21:08.573535
Send Message 2 Exec. 2020-03-08 21:21:10.574442
Send Message 3 Exec. 2020-03-08 21:21:12.575198

Display of host1_main.py


PS C:\Users\xxxx\program\python\pika> python .\host1_main.py
Host1 b'Hello World 1' Received. 2020-03-08 21:21:08.575536
Host1 b'Hello World 3' Received. 2020-03-08 21:21:12.577199

View host_kotlin_mainKt.kt


"C:\Program Files\Java\bin\java.exe" .... Host_kotlin_mainKt
Host Kotlin 'Hello World 0' Received
Host Kotlin 'Hello World 2' Received

If you look at the execution result, you can see that it works normally regardless of whether the consumer is python or Kotlin (Java).

in conclusion

I tried to summarize the convenient usage of RabbitMq. Most of the RabbitMq was queuing or AMQP, and there was almost no RabbitMq original. Personally, I found it easy to use because it naturally disperses and it is very easy to add producers and consumers. This time, I focused on the relationship between producers and consumers, but when I focused on the queue, there were still many functions. I think it will take a lot of time to grasp everything.

Recommended Posts

Benefits and examples of using RabbitMq
Example of using class variables and class methods
Benefits of using slugfield in Django's model
Comparison of k-means implementation examples of scikit-learn and pyclustering
Example of using lambda
Collection and automation of erotic images using deep learning
Calculation of odometry using CNN and depth estimation Part 2 (CNN SLAM # 2)
Examination of Forecasting Method Using Deep Learning and Wavelet Transform-Part 2-
Get data using Ministry of Internal Affairs and Communications API
View the contents of the queue using the RabbitMQ Management Web API
Using MLflow with Databricks ② --Visualization of experimental parameters and metrics -
Verification and implementation of video reconstruction method using GRU and Autoencoder
Reconstruction of cone beam CT (CBCT) using python and TIGRE
Mechanism of pyenv and virtualenv
Implementation of TF-IDF using gensim
Pre-processing and post-processing of pytest
Benefits of refining Django's Model
Quicksort details and code examples
Combination of recursion and generator
Combination of anyenv and direnv
Explanation and implementation of SocialFoceModel
python: Basics of using scikit-learn ①
Differentiation of sort and generalization of sort
Coexistence of pyenv and autojump
Introduction of caffe using pyenv
When using if and when using while
Use and integration of "Shodan"
Problems of liars and honesty
This and that using reflect
Try using pytest-Overview and Samples-
Occurrence and resolution of tensorflow.python.framework.errors_impl.FailedPreconditionError
Comparison of Apex and Lamvery
Source installation and installation of Python
Introduction and tips of mlflow.Tracking
A memorandum of using eigen3
Find the critical path of PERT using breadth-first search and depth-first search
Flow of getting the result of asynchronous processing using Django and Celery
A little more about references ~ Using Python and Java as examples ~
Aligning scanned images of animated video paper using OpenCV and Python
Get and set the value of the dropdown menu using Python and Selenium
python> No #ifdef> Another solution> __debug__ Pros and cons of using
Predict the rise and fall of BTC price using Qore SDK
Implementation of Datetime picker action using line-bot-sdk-python and implementation sample of Image Carousel
[Ruby on Rails] Display and pinning of GoolgeMAP using Google API