-Asynchronous processing by combining django and celery How to save the result of doing this using django-celery-results --A memorandum of the stumbling block that the status remains PENDING or the result is not written to the DB until the asynchronous process is completed.
--Environment - python: v.3.7.7 - redis: v.6.0.9
--Edit settings.py
INSTALLED_APPS = [
...
'django_celery_results', #add to
]
CELERY_BROKER_URL = "redis:// [redis host]:6379" #Specify redis as a broker
CELERY_RESULT_BACKEND = "django-db" #The result is saved in the DB specified by django. This article assumes MySQL.
CELERY_TASK_TRACK_STARTED = True #Settings for confirming that the task has started (described later)
--Executable file structure --The structure of the files created below is as follows
├── myapp
│ ├── myapp
│ │ ├── __init__.py
│ │ ├── settings.py
│ │ ├── celery.py
│ │ ├── urls.py
│ │ ├── tasks
│ │ │ ├── __init__.py
│ │ │ └── task.py
│ │ └── wsgi.py
│ └── manage.py
├── docker-compose.yml
└── requirements.txt
--Create celery.py
--Create the following celery.py
under the application directory (same level as urls.py) (almost the same as the official document)
# celery.py
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
# set the default Django settings module for the 'celery' program.
os.environ.setdefault(
'DJANGO_SETTINGS_MODULE',
'myapp.settings'
)
app = Celery('myapp')
# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
# should have a `CELERY_` prefix.
app.config_from_object(
'django.conf:settings',
namespace='CELERY'
)
# Load task modules from all registered Django app configs.
app.autodiscover_tasks()
@app.task(bind=True)
def debug_task(self):
print('Request: {0!r}'.format(self.request))
--Create task file
--In order to manage multiple asynchronous scripts, the tasks directory is cut and the scripts for tasks are placed under it.
--Prepare task.py
as follows
##Callee(=Task)
from __future__ import absolute_import, unicode_literals
from celery import shared_task
@shared_task
def hello(proj_id):
time.sleep(10) #10 seconds to confirm that the process ends asynchronously
message = "hello"
print(message) #Standard output
return message #Returns the result
--Run django application
$python manage.py makemigrations #Create a migration file (if needed)
$python manage.py migrate #execute migrate
$python manage.py rumserver #Application execution
--Redis launch
# redis-server
--Celery launch --cd to the directory where manage.py is located and execute the following command
$celery -A myapp worker --concurrency=1 -l info
--Tasks can be executed with [your_task_function] .delay ()
--In the example below, it just waits 10 seconds and then returns "hello".
--In order to check the contents of the task, it is necessary to get the ID assigned to the task.
--The task object is returned as the return value of hello.delay ()
, and the id can be referenced.
--By holding the task ID in a DB etc., you can check the task status at any time inside the application or from the logic of another process.
from django_app.tasks import hello
from celery.result import AsyncResult
def hoge():
task = hello.delay(project_id) #This is all you need to call asynchronous processing. After that, the process flows asynchronously
print("===============")
print(task.id) #You can check the uniquely assigned ID.
--Check the task status using the task ID obtained in the above example.
--Use AsyncResult
to check the status and result
--You can get the task object by passing the task ID.
--You can get the status of the task with the status
method
--PENDING
: Waiting for execution.
--STARTED
: Execution start status. (It is not output by default. The output method will be described later.)
--SUCCESS
: Execution completed normally.
--FAILED
: Execution terminated abnormally.
from django_app.tasks import hello
from celery.result import AsyncResult
def hoge():
task = hello.delay(project_id) #This is all you need to call asynchronous processing. After that, the process flows asynchronously
print("===============")
task_id = task.id
print(task_id) #You can check the uniquely assigned ID.
task_1 = AsyncResult(task_id)
print(task_1.status) #Check the status immediately after the start of processing
time.sleep(1)
print("===============")
task_2 = AsyncResult(task_id)
print(task_2.status) #Check the status 1 second after processing
--Save the result in DB
――I don't think you use it much, but as a method. ..
--By specifying CELERY_RESULT_BACKEND
in settings.py, the status / result of asynchronous processing is automatically saved in the DB. In this article, we will assume MySQL.
--You can search task information from django by specifying the task ID.
from django_app.tasks import hello
from django_celery_results.models import TaskResult
def hoge():
task = hello.delay(project_id) #This is all you need to call asynchronous processing. After that, the process flows asynchronously
print("===============")
task_id = task.id #You can check the uniquely assigned ID.
print(task_id)
task_model_1 = TaskResult.objects.filter(task_id=task_id)
if len(task_model_1) > 0:
print(task_model_1[0].status) #Check the status immediately after the start of processing
time.sleep(1)
task_model_2 = TaskResult.objects.filter(task_id=task_id)
if len(task_model_2) > 0:
print(task_model_2[0].status) #Check the status 1 second after processing
#Contents of MySQL
##The table is as follows.
mysql> show tables;
+----------------------------------+
| Tables_in_nodeai |
+----------------------------------+
| auth_group |
| auth_group_permissions |
| auth_permission |
| django_admin_log |
| django_celery_results_taskresult | ## <-This has been added!
...
## django_celery_results_The schema of task result is as follows.
mysql> desc django_celery_results_taskresult;
+------------------+--------------+------+-----+---------+----------------+
| Field | Type | Null | Key | Default | Extra |
+------------------+--------------+------+-----+---------+----------------+
| id | int(11) | NO | PRI | NULL | auto_increment |
| task_id | varchar(255) | NO | UNI | NULL | |
| status | varchar(50) | NO | MUL | NULL | |
| content_type | varchar(128) | NO | | NULL | |
| content_encoding | varchar(64) | NO | | NULL | |
| result | longtext | YES | | NULL | |
| date_done | datetime(6) | NO | MUL | NULL | |
| traceback | longtext | YES | | NULL | |
| meta | longtext | YES | | NULL | |
| task_args | longtext | YES | | NULL | |
| task_kwargs | longtext | YES | | NULL | |
| task_name | varchar(255) | YES | MUL | NULL | |
| worker | varchar(100) | YES | MUL | NULL | |
| date_created | datetime(6) | NO | MUL | NULL | |
+------------------+--------------+------+-----+---------+----------------+
--If CELERY_TASK_TRACK_STARTED
is not specified in settings.py, the task will be executed until the result is obtained.
--The status obtained using AsyncResult
is still PENDING
--The contents of the task are not held in the DB. (Saved only after the result is available)
--In order to solve it, the following settings are put in settings.py
CELERY_TASK_TRACK_STARTED = True
--If you raise an error with raise, the result of the task will be FAILED
--The error message resulting from raising is stored in task.result
##Callee(=Task)
from __future__ import absolute_import, unicode_literals
from celery import shared_task
@shared_task
def hello(proj_id):
time.sleep(10) #10 seconds to confirm that the process ends asynchronously
message = "hello"
raise Exception("my error message") #Error notification
#Results stored in MySQL
mysql> select * from django_celery_results_taskresult\G
...
*************************** 31. row ***************************
id: 31
task_id: be294008-d2fc-4760-9055-483efdaa4970
status: FAILURE
content_type: application/json
content_encoding: utf-8
result: {"exc_type": "Exception", "exc_message": ["my error message"], "exc_module": "builtins"}
date_done: 2020-11-10 08:06:32.848782
traceback: Traceback (most recent call last):...
meta: {"children": []}
task_args: (4,)
task_kwargs: {}
task_name: myapp.tasks.task.hello
worker: celery@05ab2e4b5ee1
date_created: 2020-11-10 08:06:22.829301
Recommended Posts