[PYTHON] Ablauf des Ergebnisses der asynchronen Verarbeitung mit Django und Sellerie

Überblick

-Asynchrone Verarbeitung durch Kombination von django und Sellerie Fließen Sie, bis Sie das Ergebnis der Verwendung von [django-cellery-results] gespeichert haben (https://github.com/celery/django-celery-results).


Annahme

--Umgebung - python: v.3.7.7 - redis: v.6.0.9


Installation

--Edit settings.py

INSTALLED_APPS = [
    ...
    'django_celery_results', #hinzufügen
]

CELERY_BROKER_URL = "redis:// [redis host]:6379" #Geben Sie redis als Broker an
CELERY_RESULT_BACKEND = "django-db" #Das Ergebnis wird in der von django angegebenen Datenbank gespeichert. Dieser Artikel setzt MySQL voraus.
CELERY_TASK_TRACK_STARTED = True #Einstellungen zur Bestätigung, dass die Aufgabe gestartet wurde (später beschrieben)

Bauen

├── myapp
│   ├── myapp
│   │   ├── __init__.py
│   │   ├── settings.py
│   │   ├── celery.py
│   │   ├── urls.py
│   │   ├── tasks
│   │   │   ├── __init__.py
│   │   │   └── task.py
│   │   └── wsgi.py
│   └── manage.py
├── docker-compose.yml
└── requirements.txt

--Cellerie.py erstellen --Erstellen Sie die folgende celery.py unter dem Anwendungsverzeichnis (dieselbe Ebene wie urls.py) (fast dieselbe wie das offizielle Dokument)

# celery.py
from __future__ import absolute_import, unicode_literals

import os

from celery import Celery

# set the default Django settings module for the 'celery' program.
os.environ.setdefault(
    'DJANGO_SETTINGS_MODULE',
    'myapp.settings'
)

app = Celery('myapp') 

# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.
app.config_from_object(
    'django.conf:settings',
    namespace='CELERY'
)

# Load task modules from all registered Django app configs.
app.autodiscover_tasks()


@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))

--Aufgabendatei erstellen

##Angerufene(=Task)
from __future__ import absolute_import, unicode_literals
from celery import shared_task

@shared_task
def hello(proj_id):
    time.sleep(10) #10 Sekunden, um zu bestätigen, dass der Prozess asynchron beendet wird
    message = "hello"
    print(message) #Standardausgabe
    return message #Gibt das Ergebnis zurück

Starten

$python manage.py makemigrations #Erstellen Sie eine Migrationsdatei (falls erforderlich)
$python manage.py migrate #Führen Sie die Migration aus
$python manage.py rumserver #Anwendungsausführung
# redis-server
$celery -A myapp worker --concurrency=1 -l info

Führen Sie die Aufgabe aus, um die ID zu erhalten

from django_app.tasks import hello
from celery.result import AsyncResult

def hoge():
    task = hello.delay(project_id) #Dies ist alles, was Sie zum Aufrufen der asynchronen Verarbeitung benötigen. Danach läuft der Prozess asynchron ab
    print("===============")
    print(task.id) #Sie können die eindeutig zugewiesene ID überprüfen.

Überprüfen Sie den Status der Aufgabe

--Überprüfen Sie den Aufgabenstatus anhand der im obigen Beispiel erhaltenen Aufgaben-ID. --Verwenden Sie AsyncResult, um den Status und das Ergebnis zu überprüfen

from django_app.tasks import hello
from celery.result import AsyncResult

def hoge():
    task = hello.delay(project_id) #Dies ist alles, was Sie zum Aufrufen der asynchronen Verarbeitung benötigen. Danach läuft der Prozess asynchron ab
    print("===============")
    task_id = task.id
    print(task_id) #Sie können die eindeutig zugewiesene ID überprüfen.
    task_1 = AsyncResult(task_id)
    print(task_1.status) #Überprüfen Sie den Status sofort nach Beginn der Verarbeitung
    time.sleep(1)
    print("===============")
    task_2 = AsyncResult(task_id)
    print(task_2.status) #Überprüfen Sie den Status 1 Sekunde nach der Verarbeitung
from django_app.tasks import hello
from django_celery_results.models import TaskResult

def hoge():
    task = hello.delay(project_id) #Dies ist alles, was Sie zum Aufrufen der asynchronen Verarbeitung benötigen. Danach läuft der Prozess asynchron ab
    print("===============")
    task_id = task.id #Sie können die eindeutig zugewiesene ID überprüfen.
    print(task_id) 
    task_model_1 = TaskResult.objects.filter(task_id=task_id)
    if len(task_model_1) > 0:
        print(task_model_1[0].status) #Überprüfen Sie den Status sofort nach Beginn der Verarbeitung
    time.sleep(1)
    task_model_2 = TaskResult.objects.filter(task_id=task_id)
    if len(task_model_2) > 0:
        print(task_model_2[0].status) #Überprüfen Sie den Status 1 Sekunde nach der Verarbeitung
#Inhalt von MySQL

##Die Tabelle ist wie folgt.
mysql> show tables;
+----------------------------------+
| Tables_in_nodeai                 |
+----------------------------------+
| auth_group                       |
| auth_group_permissions           |
| auth_permission                  |
| django_admin_log                 |
| django_celery_results_taskresult |  ## <-Dies wurde hinzugefügt!
...

## django_celery_results_Das Aufgabenergebnisschema lautet wie folgt.
mysql> desc django_celery_results_taskresult;
+------------------+--------------+------+-----+---------+----------------+
| Field            | Type         | Null | Key | Default | Extra          |
+------------------+--------------+------+-----+---------+----------------+
| id               | int(11)      | NO   | PRI | NULL    | auto_increment |
| task_id          | varchar(255) | NO   | UNI | NULL    |                |
| status           | varchar(50)  | NO   | MUL | NULL    |                |
| content_type     | varchar(128) | NO   |     | NULL    |                |
| content_encoding | varchar(64)  | NO   |     | NULL    |                |
| result           | longtext     | YES  |     | NULL    |                |
| date_done        | datetime(6)  | NO   | MUL | NULL    |                |
| traceback        | longtext     | YES  |     | NULL    |                |
| meta             | longtext     | YES  |     | NULL    |                |
| task_args        | longtext     | YES  |     | NULL    |                |
| task_kwargs      | longtext     | YES  |     | NULL    |                |
| task_name        | varchar(255) | YES  | MUL | NULL    |                |
| worker           | varchar(100) | YES  | MUL | NULL    |                |
| date_created     | datetime(6)  | NO   | MUL | NULL    |                |
+------------------+--------------+------+-----+---------+----------------+

Verhindern Sie, dass der Task-Status PENDING bleibt

--Wenn CELERY_TASK_TRACK_STARTED in settings.py nicht angegeben ist, wird die Task ausgeführt, bis das Ergebnis erhalten wird.

CELERY_TASK_TRACK_STARTED = True

Versuchen Sie, die Aufgabe zu scheitern

##Angerufene(=Task)
from __future__ import absolute_import, unicode_literals
from celery import shared_task

@shared_task
def hello(proj_id):
    time.sleep(10) #10 Sekunden, um zu bestätigen, dass der Prozess asynchron beendet wird
    message = "hello"
    raise Exception("my error message") #Fehlerbenachrichtigung
#In MySQL gespeicherte Ergebnisse
mysql> select * from django_celery_results_taskresult\G
...
*************************** 31. row ***************************
              id: 31
         task_id: be294008-d2fc-4760-9055-483efdaa4970
          status: FAILURE
    content_type: application/json
content_encoding: utf-8
          result: {"exc_type": "Exception", "exc_message": ["my error message"], "exc_module": "builtins"}
       date_done: 2020-11-10 08:06:32.848782
       traceback: Traceback (most recent call last):...
            meta: {"children": []}
       task_args: (4,)
     task_kwargs: {}
       task_name: myapp.tasks.task.hello
          worker: celery@05ab2e4b5ee1
    date_created: 2020-11-10 08:06:22.829301

Recommended Posts

Ablauf des Ergebnisses der asynchronen Verarbeitung mit Django und Sellerie
Ich habe den asynchronen Server von Django 3.0 ausprobiert
Implementierung der asynchronen Verarbeitung in Django (Sellerie, Redis)
Organisieren Sie den Fluss, wenn Sie Django mit NGINX und Gunicorn ausführen (persönliches Memo).
Exklusive Veröffentlichung der Django App mit ngrok
Zeigen Sie das Ergebnis der Geometrieverarbeitung in Python an
Der Prozess der Installation von Atom und der Ausführung von Python
Asynchrone Verarbeitung von Python ~ Asynchron vollständig verstehen und warten ~
[Django 2.2] Sortieren und erhalten Sie den Wert des Beziehungsziels
DJango Hinweis: Von Anfang an (Vereinfachung und Aufteilung von URLConf)
Erste Schritte und Verwendung von auf Klassenklassen basierenden generischen Ansichten in Django
100 Sprachverarbeitung Knock-42: Anzeige der Phrase der betroffenen Person und der betroffenen Person
Scraping das Ergebnis von "Schedule-Kun"
Sellerie asynchrone Verarbeitung in Flask
Finden Sie den kritischen Pfad von PERT mithilfe der Breitenprioritätssuche und der Tiefenprioritätssuche
Verstehen Sie die Funktion der Faltung am Beispiel der Bildverarbeitung
Wahrscheinlichkeit der höchsten und niedrigsten Jungtierpreise in Atsumori
Rufen Sie den Wert des Dropdown-Menüs mit Python und Selen ab und legen Sie ihn fest
Konvertieren Sie das Ergebnis von Python Optparse, um es zu diktieren und zu verwenden
Verwenden Sie das Qore SDK, um BTC-Preiserhöhungen und -senkungen vorherzusagen
[Word2vec] Lassen Sie uns das Ergebnis der Verarbeitung von Unternehmensbewertungen in natürlicher Sprache visualisieren
for, continue, break Erläutern Sie den Ablauf der iterativen Verarbeitung in Python3-Teil 1
Die Geschichte von Python und die Geschichte von NaN
Vorteile und Beispiele für die Verwendung von Rabbit Mq
Nimm das Ausführungsprotokoll von Sellerie
Verarbeiten Sie das Ergebnis von% time,% timeit
Ich habe versucht, asynchrone Verarbeitung mit Asyncio
100 Sprachverarbeitung Knock-93 (unter Verwendung von Pandas): Berechnung der Genauigkeitsrate der Analogie-Aufgabe
Ich habe versucht, die Trefferergebnisse von Hachinai mithilfe der Bildverarbeitung zu erhalten
Zeigen Sie das Ergebnis der Videoanalyse mit der Cloud Video Intelligence API von Colaboratory an.
Ich habe versucht, die Phase der Geschichte mit COTOHA zu extrahieren und zu veranschaulichen
Erhalten Sie Nachrichten von jedem der drei großen Mobilfunkunternehmen mithilfe von Django und der Nachrichten-API
Erhalten und schätzen Sie die Form des Kopfes mit Dlib und OpenCV mit Python
Über die Hauptaufgaben der Bildverarbeitung (Computer Vision) und die verwendete Architektur
Die Geschichte der Verwendung von mysqlclient, da PyMySQL nicht mit Django 2.2 verwendet werden kann