[PYTHON] Flow of getting the result of asynchronous processing using Django and Celery

Overview

-Asynchronous processing by combining django and celery How to save the result of doing this using django-celery-results --A memorandum of the stumbling block that the status remains PENDING or the result is not written to the DB until the asynchronous process is completed.


Premise

--Environment - python: v.3.7.7 - redis: v.6.0.9


Installation

--Edit settings.py

INSTALLED_APPS = [
    ...
    'django_celery_results', #add to
]

CELERY_BROKER_URL = "redis:// [redis host]:6379" #Specify redis as a broker
CELERY_RESULT_BACKEND = "django-db" #The result is saved in the DB specified by django. This article assumes MySQL.
CELERY_TASK_TRACK_STARTED = True #Settings for confirming that the task has started (described later)

Build

--Executable file structure --The structure of the files created below is as follows

├── myapp
│   ├── myapp
│   │   ├── __init__.py
│   │   ├── settings.py
│   │   ├── celery.py
│   │   ├── urls.py
│   │   ├── tasks
│   │   │   ├── __init__.py
│   │   │   └── task.py
│   │   └── wsgi.py
│   └── manage.py
├── docker-compose.yml
└── requirements.txt

--Create celery.py --Create the following celery.py under the application directory (same level as urls.py) (almost the same as the official document)

# celery.py
from __future__ import absolute_import, unicode_literals

import os

from celery import Celery

# set the default Django settings module for the 'celery' program.
os.environ.setdefault(
    'DJANGO_SETTINGS_MODULE',
    'myapp.settings'
)

app = Celery('myapp') 

# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.
app.config_from_object(
    'django.conf:settings',
    namespace='CELERY'
)

# Load task modules from all registered Django app configs.
app.autodiscover_tasks()


@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))

--Create task file --In order to manage multiple asynchronous scripts, the tasks directory is cut and the scripts for tasks are placed under it. --Prepare task.py as follows

##Callee(=Task)
from __future__ import absolute_import, unicode_literals
from celery import shared_task

@shared_task
def hello(proj_id):
    time.sleep(10) #10 seconds to confirm that the process ends asynchronously
    message = "hello"
    print(message) #Standard output
    return message #Returns the result

Launch

--Run django application

$python manage.py makemigrations #Create a migration file (if needed)
$python manage.py migrate #execute migrate
$python manage.py rumserver #Application execution

--Redis launch

# redis-server

--Celery launch --cd to the directory where manage.py is located and execute the following command

$celery -A myapp worker --concurrency=1 -l info

Execute Task to get ID

--Tasks can be executed with [your_task_function] .delay () --In the example below, it just waits 10 seconds and then returns "hello". --In order to check the contents of the task, it is necessary to get the ID assigned to the task. --The task object is returned as the return value of hello.delay (), and the id can be referenced. --By holding the task ID in a DB etc., you can check the task status at any time inside the application or from the logic of another process.

from django_app.tasks import hello
from celery.result import AsyncResult

def hoge():
    task = hello.delay(project_id) #This is all you need to call asynchronous processing. After that, the process flows asynchronously
    print("===============")
    print(task.id) #You can check the uniquely assigned ID.

Check the status of Task

--Check the task status using the task ID obtained in the above example. --Use AsyncResult to check the status and result --You can get the task object by passing the task ID. --You can get the status of the task with the status method --PENDING: Waiting for execution. --STARTED: Execution start status. (It is not output by default. The output method will be described later.) --SUCCESS: Execution completed normally. --FAILED: Execution terminated abnormally.

from django_app.tasks import hello
from celery.result import AsyncResult

def hoge():
    task = hello.delay(project_id) #This is all you need to call asynchronous processing. After that, the process flows asynchronously
    print("===============")
    task_id = task.id
    print(task_id) #You can check the uniquely assigned ID.
    task_1 = AsyncResult(task_id)
    print(task_1.status) #Check the status immediately after the start of processing
    time.sleep(1)
    print("===============")
    task_2 = AsyncResult(task_id)
    print(task_2.status) #Check the status 1 second after processing

--Save the result in DB ――I don't think you use it much, but as a method. .. --By specifying CELERY_RESULT_BACKEND in settings.py, the status / result of asynchronous processing is automatically saved in the DB. In this article, we will assume MySQL. --You can search task information from django by specifying the task ID.

from django_app.tasks import hello
from django_celery_results.models import TaskResult

def hoge():
    task = hello.delay(project_id) #This is all you need to call asynchronous processing. After that, the process flows asynchronously
    print("===============")
    task_id = task.id #You can check the uniquely assigned ID.
    print(task_id) 
    task_model_1 = TaskResult.objects.filter(task_id=task_id)
    if len(task_model_1) > 0:
        print(task_model_1[0].status) #Check the status immediately after the start of processing
    time.sleep(1)
    task_model_2 = TaskResult.objects.filter(task_id=task_id)
    if len(task_model_2) > 0:
        print(task_model_2[0].status) #Check the status 1 second after processing
#Contents of MySQL

##The table is as follows.
mysql> show tables;
+----------------------------------+
| Tables_in_nodeai                 |
+----------------------------------+
| auth_group                       |
| auth_group_permissions           |
| auth_permission                  |
| django_admin_log                 |
| django_celery_results_taskresult |  ## <-This has been added!
...

## django_celery_results_The schema of task result is as follows.
mysql> desc django_celery_results_taskresult;
+------------------+--------------+------+-----+---------+----------------+
| Field            | Type         | Null | Key | Default | Extra          |
+------------------+--------------+------+-----+---------+----------------+
| id               | int(11)      | NO   | PRI | NULL    | auto_increment |
| task_id          | varchar(255) | NO   | UNI | NULL    |                |
| status           | varchar(50)  | NO   | MUL | NULL    |                |
| content_type     | varchar(128) | NO   |     | NULL    |                |
| content_encoding | varchar(64)  | NO   |     | NULL    |                |
| result           | longtext     | YES  |     | NULL    |                |
| date_done        | datetime(6)  | NO   | MUL | NULL    |                |
| traceback        | longtext     | YES  |     | NULL    |                |
| meta             | longtext     | YES  |     | NULL    |                |
| task_args        | longtext     | YES  |     | NULL    |                |
| task_kwargs      | longtext     | YES  |     | NULL    |                |
| task_name        | varchar(255) | YES  | MUL | NULL    |                |
| worker           | varchar(100) | YES  | MUL | NULL    |                |
| date_created     | datetime(6)  | NO   | MUL | NULL    |                |
+------------------+--------------+------+-----+---------+----------------+

Prevent the Task state from staying PENDING

--If CELERY_TASK_TRACK_STARTED is not specified in settings.py, the task will be executed until the result is obtained. --The status obtained using AsyncResult is still PENDING --The contents of the task are not held in the DB. (Saved only after the result is available) --In order to solve it, the following settings are put in settings.py

CELERY_TASK_TRACK_STARTED = True

Try to fail Task

--If you raise an error with raise, the result of the task will be FAILED --The error message resulting from raising is stored in task.result

##Callee(=Task)
from __future__ import absolute_import, unicode_literals
from celery import shared_task

@shared_task
def hello(proj_id):
    time.sleep(10) #10 seconds to confirm that the process ends asynchronously
    message = "hello"
    raise Exception("my error message") #Error notification
#Results stored in MySQL
mysql> select * from django_celery_results_taskresult\G
...
*************************** 31. row ***************************
              id: 31
         task_id: be294008-d2fc-4760-9055-483efdaa4970
          status: FAILURE
    content_type: application/json
content_encoding: utf-8
          result: {"exc_type": "Exception", "exc_message": ["my error message"], "exc_module": "builtins"}
       date_done: 2020-11-10 08:06:32.848782
       traceback: Traceback (most recent call last):...
            meta: {"children": []}
       task_args: (4,)
     task_kwargs: {}
       task_name: myapp.tasks.task.hello
          worker: celery@05ab2e4b5ee1
    date_created: 2020-11-10 08:06:22.829301

Recommended Posts

Flow of getting the result of asynchronous processing using Django and Celery
I tried the asynchronous server of Django 3.0
Asynchronous processing implementation in Django (Celery, Redis)
Organize the flow when running Django using NGINX and gunicorn (personal memo)
Exclusive release of the django app using ngrok
View the result of geometry processing in Python
The process of installing Atom and getting Python running
Python asynchronous processing ~ Full understanding of async and await ~
[Django 2.2] Sort and get the value of the relation destination
DJango Note: From the beginning (simplification and splitting of URLConf)
Getting started and using sample class-based generic views in Django
100 language processing knock-42: Display of the phrase of the person concerned and the person concerned
Scraping the result of "Schedule-kun"
Celery asynchronous processing in Flask
Find the critical path of PERT using breadth-first search and depth-first search
Understand the function of convolution using image processing as an example
Feature extraction by TF method using the result of morphological analysis
Probability of getting the highest and lowest turnip prices in Atsumori
Get and set the value of the dropdown menu using Python and Selenium
Convert the result of python optparse to dict and utilize it
Predict the rise and fall of BTC price using Qore SDK
[Word2vec] Let's visualize the result of natural language processing of company reviews
for, continue, break Explain the flow of iterative processing in Python3-Part 1
The story of Python and the story of NaN
Benefits and examples of using RabbitMq
Take the execution log of Celery
Process the result of% time,% timeit
The meaning of ".object" in Django
I tried asynchronous processing using asyncio
100 Language Processing Knock-93 (using pandas): Calculate the accuracy rate of analogy tasks
I tried to get the batting results of Hachinai using image processing
Display the result of video analysis using Cloud Video Intelligence API from Colaboratory.
I tried to extract and illustrate the stage of the story using COTOHA
Get news from three major mobile companies using Django and the News API
Get and estimate the shape of the head using Dlib and OpenCV with python
About the main tasks of image processing (computer vision) and the architecture used
The story of using mysqlclient because PyMySQL cannot be used with Django 2.2