[PYTHON] I tried task queuing from Celery


Perform processing using Celery that performs distributed TaskQueue processing I described the sample.



celery installation

pip install celery

In case of windows, celery 4 or later is not supported, so Specifies the last version that windows is supported.

Installation of celery (windows)

pip install celery==3.1.25

Worker code

Use the worker method to actually perform the process.


from celery import Celery

app = Celery('tasks', result='rpc://', broker='amqp://[email protected]//')
def add(x, y):
    return x, y

Start this as a worker. Use RabbitMQ started at Specify rpc: // where the result will be saved (it seems to be the result backend). It seems that Redis etc. will be the storage destination if it is production.

$ celery -A tasks worker --loglevel=info

the ability to execute any command.  It's important to secure
your broker from unauthorized access when using pickle, so we think
that enabling pickle should require a deliberate action and not be
the default choice.

If you depend on pickle then you should set a setting to disable this
warning and to be sure that everything will continue working
when you upgrade to Celery 3.2::

    CELERY_ACCEPT_CONTENT = ['pickle', 'json', 'msgpack', 'yaml']

You must only enable the serializers that you will actually use.


 -------------- celery@DESKTOP-GJOIME5 v3.1.25 (Cipater)
---- **** -----
--- * ***  * -- Windows-10-10.0.14393-SP0
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app:         tasks:0x22d0e56d080
- ** ---------- .> transport:   amqp://guest:**@
- ** ---------- .> results:     disabled://
- *** --- * --- .> concurrency: 8 (prefork)
-- ******* ----
--- ***** ----- [queues]
 -------------- .> celery           exchange=celery(direct) key=celery

  . tasks.add

[2017-06-19 06:45:10,040: INFO/MainProcess] Connected to amqp://guest:**@
[2017-06-19 06:45:10,118: INFO/MainProcess] mingle: searching for neighbors
[2017-06-19 06:45:11,262: INFO/MainProcess] mingle: all alone
[2017-06-19 06:45:11,332: WARNING/MainProcess] celery@DESKTOP-GJOIME5 ready.

It started up safely.

Executing a task

Caller code and execution result

Caller code

>>> from tasks import add
>>> async_result = add.delay(1,2)
>>> async_result
<AsyncResult: 69bf0ccf-6e74-46e0-ae5a-1fb566bb0657>
#It is linked with the result stored in Redis etc. using uuid of AsyncResult???
>>> async_result.ready()
>>> async_result.result

Tasks can be queued by calling with the delay method. The result can be obtained from result after the result of result.ready () becomes True.

Worker behavior when calling

Behavior when throwing a task into a worker that has already been started. As far as I can see, it seems that the specified task can be executed safely.

Behavior on the worker side

[2017-06-19 06:56:23,934: INFO/MainProcess] Received task: tasks.add[ff679978-8edd-47db-b599-79aa3c8844eb]
[2017-06-19 06:56:23,934: INFO/MainProcess] Task tasks.add[ff679978-8edd-47db-b599-79aa3c8844eb] succeeded in
0s: 3

Save results

To save the task execution result, specify the backend when creating an instance of Celery. This time, rpc: // is specified, but it seems recommended to store it in Redis etc. in production operation. (https://blog.ozacc.com/docs/celery/getting-started/first-steps-with-celery.html#keeping-results)


Task queuing with Celery turned out to be fairly easy. For a simple task processing mechanism, use Celery + RabbitMQ + result save destination. It seems that it can be created quickly.

Reference material

-Celery Official -Celery Official Document

