[PYTHON] Flux d'obtention du résultat du traitement asynchrone à l'aide de Django et Celery

Aperçu

-Traitement asynchrone en combinant django et celery Flux jusqu'à l'enregistrement du résultat de l'utilisation de django-celery-results


supposition

--Environnement - python: v.3.7.7 - redis: v.6.0.9


Installation

--Modifier settings.py

INSTALLED_APPS = [
    ...
    'django_celery_results', #ajouter à
]

CELERY_BROKER_URL = "redis:// [redis host]:6379" #Spécifiez redis comme courtier
CELERY_RESULT_BACKEND = "django-db" #Le résultat est enregistré dans la base de données spécifiée par django. Cet article suppose MySQL.
CELERY_TASK_TRACK_STARTED = True #Paramètres de confirmation du démarrage de la tâche (décrits plus loin)

Construire

├── myapp
│   ├── myapp
│   │   ├── __init__.py
│   │   ├── settings.py
│   │   ├── celery.py
│   │   ├── urls.py
│   │   ├── tasks
│   │   │   ├── __init__.py
│   │   │   └── task.py
│   │   └── wsgi.py
│   └── manage.py
├── docker-compose.yml
└── requirements.txt

--Créer celery.py --Créez le celery.py suivant sous le répertoire de l'application (même niveau que urls.py) (presque le même que le document officiel)

# celery.py
from __future__ import absolute_import, unicode_literals

import os

from celery import Celery

# set the default Django settings module for the 'celery' program.
os.environ.setdefault(
    'DJANGO_SETTINGS_MODULE',
    'myapp.settings'
)

app = Celery('myapp') 

# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.
app.config_from_object(
    'django.conf:settings',
    namespace='CELERY'
)

# Load task modules from all registered Django app configs.
app.autodiscover_tasks()


@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))

--Créer un fichier de tâche

##Appelé(=Task)
from __future__ import absolute_import, unicode_literals
from celery import shared_task

@shared_task
def hello(proj_id):
    time.sleep(10) #10 secondes pour confirmer que le processus se termine de manière asynchrone
    message = "hello"
    print(message) #Sortie standard
    return message #Renvoie le résultat

lancement

--Exécuter l'application Django

$python manage.py makemigrations #Créez un fichier de migration (si nécessaire)
$python manage.py migrate #exécuter migrer
$python manage.py rumserver #Exécution de l'application
# redis-server
$celery -A myapp worker --concurrency=1 -l info

Exécuter la tâche pour obtenir l'ID

from django_app.tasks import hello
from celery.result import AsyncResult

def hoge():
    task = hello.delay(project_id) #C'est tout ce dont vous avez besoin pour appeler le traitement asynchrone. Après cela, le processus se déroule de manière asynchrone
    print("===============")
    print(task.id) #Vous pouvez vérifier l'ID attribué de manière unique.

Vérifiez l'état de la tâche

--Vérifiez l'état de la tâche à l'aide de l'ID de tâche obtenu dans l'exemple ci-dessus. --Utilisez AsyncResult pour vérifier l'état et le résultat

from django_app.tasks import hello
from celery.result import AsyncResult

def hoge():
    task = hello.delay(project_id) #C'est tout ce dont vous avez besoin pour appeler le traitement asynchrone. Après cela, le processus se déroule de manière asynchrone
    print("===============")
    task_id = task.id
    print(task_id) #Vous pouvez vérifier l'ID attribué de manière unique.
    task_1 = AsyncResult(task_id)
    print(task_1.status) #Vérifiez l'état immédiatement après le début du traitement
    time.sleep(1)
    print("===============")
    task_2 = AsyncResult(task_id)
    print(task_2.status) #Vérifiez l'état 1 seconde après le traitement

--Enregistrer le résultat dans DB «Je ne pense pas que vous l'utilisiez beaucoup, mais comme méthode. ..

from django_app.tasks import hello
from django_celery_results.models import TaskResult

def hoge():
    task = hello.delay(project_id) #C'est tout ce dont vous avez besoin pour appeler le traitement asynchrone. Après cela, le processus se déroule de manière asynchrone
    print("===============")
    task_id = task.id #Vous pouvez vérifier l'ID attribué de manière unique.
    print(task_id) 
    task_model_1 = TaskResult.objects.filter(task_id=task_id)
    if len(task_model_1) > 0:
        print(task_model_1[0].status) #Vérifiez l'état immédiatement après le début du traitement
    time.sleep(1)
    task_model_2 = TaskResult.objects.filter(task_id=task_id)
    if len(task_model_2) > 0:
        print(task_model_2[0].status) #Vérifiez l'état 1 seconde après le traitement
#Contenu de MySQL

##Le tableau est le suivant.
mysql> show tables;
+----------------------------------+
| Tables_in_nodeai                 |
+----------------------------------+
| auth_group                       |
| auth_group_permissions           |
| auth_permission                  |
| django_admin_log                 |
| django_celery_results_taskresult |  ## <-Cela a été ajouté!
...

## django_celery_results_Le schéma de résultat de la tâche est le suivant.
mysql> desc django_celery_results_taskresult;
+------------------+--------------+------+-----+---------+----------------+
| Field            | Type         | Null | Key | Default | Extra          |
+------------------+--------------+------+-----+---------+----------------+
| id               | int(11)      | NO   | PRI | NULL    | auto_increment |
| task_id          | varchar(255) | NO   | UNI | NULL    |                |
| status           | varchar(50)  | NO   | MUL | NULL    |                |
| content_type     | varchar(128) | NO   |     | NULL    |                |
| content_encoding | varchar(64)  | NO   |     | NULL    |                |
| result           | longtext     | YES  |     | NULL    |                |
| date_done        | datetime(6)  | NO   | MUL | NULL    |                |
| traceback        | longtext     | YES  |     | NULL    |                |
| meta             | longtext     | YES  |     | NULL    |                |
| task_args        | longtext     | YES  |     | NULL    |                |
| task_kwargs      | longtext     | YES  |     | NULL    |                |
| task_name        | varchar(255) | YES  | MUL | NULL    |                |
| worker           | varchar(100) | YES  | MUL | NULL    |                |
| date_created     | datetime(6)  | NO   | MUL | NULL    |                |
+------------------+--------------+------+-----+---------+----------------+

Empêcher l'état de la tâche de rester PENDING

--Si CELERY_TASK_TRACK_STARTED n'est pas spécifié dans settings.py, la tâche sera exécutée jusqu'à ce que le résultat soit obtenu.

CELERY_TASK_TRACK_STARTED = True

Essayez d'échouer la tâche

--Si vous soulevez une erreur avec relance, le résultat de la tâche sera FAILED

##Appelé(=Task)
from __future__ import absolute_import, unicode_literals
from celery import shared_task

@shared_task
def hello(proj_id):
    time.sleep(10) #10 secondes pour confirmer que le processus se termine de manière asynchrone
    message = "hello"
    raise Exception("my error message") #Notification d'erreur
#Résultats stockés dans MySQL
mysql> select * from django_celery_results_taskresult\G
...
*************************** 31. row ***************************
              id: 31
         task_id: be294008-d2fc-4760-9055-483efdaa4970
          status: FAILURE
    content_type: application/json
content_encoding: utf-8
          result: {"exc_type": "Exception", "exc_message": ["my error message"], "exc_module": "builtins"}
       date_done: 2020-11-10 08:06:32.848782
       traceback: Traceback (most recent call last):...
            meta: {"children": []}
       task_args: (4,)
     task_kwargs: {}
       task_name: myapp.tasks.task.hello
          worker: celery@05ab2e4b5ee1
    date_created: 2020-11-10 08:06:22.829301

Recommended Posts

Flux d'obtention du résultat du traitement asynchrone à l'aide de Django et Celery
J'ai essayé le serveur asynchrone de Django 3.0
Implémentation du traitement asynchrone dans Django (Celery, Redis)
Organisez le flux lors de l'exécution de Django en utilisant NGINX et gunicorn (mémo personnel)
Sortie exclusive de l'application Django utilisant ngrok
Afficher le résultat du traitement de la géométrie en Python
Le processus d'installation d'Atom et de l'exécution de Python
Traitement asynchrone de Python ~ Comprenez parfaitement async et attendez ~
[Django 2.2] Trier et obtenir la valeur de la destination de la relation
Remarque DJango: depuis le début (Simplification et fractionnement d'URLConf)
Prise en main et utilisation d'exemples de vues génériques basées sur des classes dans Django
100 traitement de la langue knock-42: Affichage de la phrase de la personne concernée et de la personne concernée
Grattage du résultat de "Schedule-kun"
Traitement asynchrone du céleri dans Flask
Trouvez le chemin critique de PERT en utilisant la recherche de priorité de largeur et la recherche de priorité de profondeur
Comprendre la fonction de convolution en utilisant le traitement d'image comme exemple
Probabilité des prix les plus élevés et les plus bas des louveteaux à Atsumori
Obtenez et définissez la valeur du menu déroulant en utilisant Python et Selenium
Convertissez le résultat de python optparse en dict et utilisez-le
Utilisez le SDK Qore pour prédire les augmentations et les baisses de prix BTC
[Word2vec] Visualisons le résultat du traitement en langage naturel des avis des entreprises
for, continue, break Expliquer le flux du traitement itératif dans Python3-Part 1
L'histoire de Python et l'histoire de NaN
Avantages et exemples d'utilisation de Rabbit Mq
Prenez le journal d'exécution du céleri
Traiter le résultat de% time,% timeit
J'ai essayé le traitement asynchrone en utilisant asyncio
100 traitement du langage knock-93 (en utilisant des pandas): calcul du taux de précision de la tâche d'analogie
J'ai essayé d'obtenir les résultats de Hachinai en utilisant le traitement d'image
Affichez le résultat de l'analyse vidéo à l'aide de l'API Cloud Video Intelligence de Colaboratory.
J'ai essayé d'extraire et d'illustrer l'étape de l'histoire à l'aide de COTOHA
Recevez des nouvelles de chacune des trois principales entreprises de téléphonie mobile en utilisant Django et l'API News
Obtenez et estimez la forme de la tête en utilisant Dlib et OpenCV avec python
A propos des principales tâches de traitement d'image (vision par ordinateur) et de l'architecture utilisée
L'histoire de l'utilisation de mysqlclient car PyMySQL ne peut pas être utilisé avec Django 2.2