[PYTHON] I tried task queuing from Celery

Purpose

Perform processing using Celery that performs distributed TaskQueue processing I described the sample.

Premise

Installation

celery installation


pip install celery

In case of windows, celery 4 or later is not supported, so Specifies the last version that windows is supported.

Installation of celery (windows)


pip install celery==3.1.25

Worker code

Use the worker method to actually perform the process.

tasks.py


from celery import Celery

app = Celery('tasks', result='rpc://', broker='amqp://[email protected]//')
@app.task
def add(x, y):
    return x, y

Start this as a worker. Use RabbitMQ started at 192.168.0.3. Specify rpc: // where the result will be saved (it seems to be the result backend). It seems that Redis etc. will be the storage destination if it is production.

$ celery -A tasks worker --loglevel=info

the ability to execute any command.  It's important to secure
your broker from unauthorized access when using pickle, so we think
that enabling pickle should require a deliberate action and not be
the default choice.

If you depend on pickle then you should set a setting to disable this
warning and to be sure that everything will continue working
when you upgrade to Celery 3.2::

    CELERY_ACCEPT_CONTENT = ['pickle', 'json', 'msgpack', 'yaml']

You must only enable the serializers that you will actually use.


  warnings.warn(CDeprecationWarning(W_PICKLE_DEPRECATED))

 -------------- celery@DESKTOP-GJOIME5 v3.1.25 (Cipater)
---- **** -----
--- * ***  * -- Windows-10-10.0.14393-SP0
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app:         tasks:0x22d0e56d080
- ** ---------- .> transport:   amqp://guest:**@192.168.0.3:5672//
- ** ---------- .> results:     disabled://
- *** --- * --- .> concurrency: 8 (prefork)
-- ******* ----
--- ***** ----- [queues]
 -------------- .> celery           exchange=celery(direct) key=celery


[tasks]
  . tasks.add

[2017-06-19 06:45:10,040: INFO/MainProcess] Connected to amqp://guest:**@192.168.0.3:5672//
[2017-06-19 06:45:10,118: INFO/MainProcess] mingle: searching for neighbors
[2017-06-19 06:45:11,262: INFO/MainProcess] mingle: all alone
[2017-06-19 06:45:11,332: WARNING/MainProcess] celery@DESKTOP-GJOIME5 ready.

It started up safely.

Executing a task

Caller code and execution result

Caller code


>>> from tasks import add
>>> async_result = add.delay(1,2)
>>> async_result
<AsyncResult: 69bf0ccf-6e74-46e0-ae5a-1fb566bb0657>
#It is linked with the result stored in Redis etc. using uuid of AsyncResult???
>>> async_result.ready()
True
>>> async_result.result
3

Tasks can be queued by calling with the delay method. The result can be obtained from result after the result of result.ready () becomes True.

Worker behavior when calling

Behavior when throwing a task into a worker that has already been started. As far as I can see, it seems that the specified task can be executed safely.

Behavior on the worker side


[2017-06-19 06:56:23,934: INFO/MainProcess] Received task: tasks.add[ff679978-8edd-47db-b599-79aa3c8844eb]
[2017-06-19 06:56:23,934: INFO/MainProcess] Task tasks.add[ff679978-8edd-47db-b599-79aa3c8844eb] succeeded in
0s: 3

Save results

To save the task execution result, specify the backend when creating an instance of Celery. This time, rpc: // is specified, but it seems recommended to store it in Redis etc. in production operation. (https://blog.ozacc.com/docs/celery/getting-started/first-steps-with-celery.html#keeping-results)

Summary

Task queuing with Celery turned out to be fairly easy. For a simple task processing mechanism, use Celery + RabbitMQ + result save destination. It seems that it can be created quickly.

Reference material

-Celery Official -Celery Official Document

Recommended Posts

I tried task queuing from Celery
I tried scraping conversation data from Askfm
I tried using UnityCloudBuild API from Python
I tried using Headless Chrome from Selenium
I tried PyQ
I tried AutoKeras
I tried papermill
I tried django-slack
I tried Django
I tried spleeter
I tried cgo
I tried Python! ] I graduated today from "What is Python! Python!"!
I tried debugging from Python via System Console
I tried using argparse
I tried using anytree
I tried competitive programming
I tried to detect the iris from the camera image
I tried using aiomysql
I tried using Summpy
I tried running python etc. from a bat file
I tried using coturn
I tried using "Anvil".
I tried using Hubot
I tried using ESPCN
I tried PyCaret2.0 (pycaret-nightly)
I tried deep learning
I tried AWS CDK!
I tried to debug.
I tried using PyCaret
I tried using cron
I tried Kivy's mapview
I tried using ngrok
I tried using face_recognition
I tried to paste
I tried using PySpark from Jupyter 4.x on EMR
I tried using Jupyter
I tried collecting data from a website with Scrapy
I tried using PyCaret
I tried moving EfficientDet
I tried shell programming
I tried reading data from a file using Node.js.
I tried using Heapq
[Deep Learning from scratch] I tried to explain Dropout
I tried using doctest
I tried Python> decorator
I tried running TensorFlow
I tried Auto Gluon
I tried using folium
I tried using jinja2
I tried AWS Iot
I tried Bayesian optimization!
I tried using folium
I tried using time-window
I tried the site "Deploy Azure virtual machine from Go"
[AWS] I tried using EC2, RDS, Django. Environment construction from 1
I tried to extract characters from subtitles (OpenCV: tesseract-ocr edition)
I tried to create API list.csv in Python from swagger.yaml
I tried using the Python library from Ruby with PyCall
I tried sending an email from Amazon SES with Python
I tried face recognition from the video (OpenCV: python version)
I tried to automate [a certain task] using Raspberry Pi