-Asynchrone Verarbeitung durch Kombination von django und Sellerie Fließen Sie, bis Sie das Ergebnis der Verwendung von [django-cellery-results] gespeichert haben (https://github.com/celery/django-celery-results).
--Umgebung - python: v.3.7.7 - redis: v.6.0.9
--Edit settings.py
INSTALLED_APPS = [
...
'django_celery_results', #hinzufügen
]
CELERY_BROKER_URL = "redis:// [redis host]:6379" #Geben Sie redis als Broker an
CELERY_RESULT_BACKEND = "django-db" #Das Ergebnis wird in der von django angegebenen Datenbank gespeichert. Dieser Artikel setzt MySQL voraus.
CELERY_TASK_TRACK_STARTED = True #Einstellungen zur Bestätigung, dass die Aufgabe gestartet wurde (später beschrieben)
├── myapp
│ ├── myapp
│ │ ├── __init__.py
│ │ ├── settings.py
│ │ ├── celery.py
│ │ ├── urls.py
│ │ ├── tasks
│ │ │ ├── __init__.py
│ │ │ └── task.py
│ │ └── wsgi.py
│ └── manage.py
├── docker-compose.yml
└── requirements.txt
--Cellerie.py erstellen
--Erstellen Sie die folgende celery.py
unter dem Anwendungsverzeichnis (dieselbe Ebene wie urls.py) (fast dieselbe wie das offizielle Dokument)
# celery.py
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
# set the default Django settings module for the 'celery' program.
os.environ.setdefault(
'DJANGO_SETTINGS_MODULE',
'myapp.settings'
)
app = Celery('myapp')
# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
# should have a `CELERY_` prefix.
app.config_from_object(
'django.conf:settings',
namespace='CELERY'
)
# Load task modules from all registered Django app configs.
app.autodiscover_tasks()
@app.task(bind=True)
def debug_task(self):
print('Request: {0!r}'.format(self.request))
--Aufgabendatei erstellen
task.py
wie folgt vor##Angerufene(=Task)
from __future__ import absolute_import, unicode_literals
from celery import shared_task
@shared_task
def hello(proj_id):
time.sleep(10) #10 Sekunden, um zu bestätigen, dass der Prozess asynchron beendet wird
message = "hello"
print(message) #Standardausgabe
return message #Gibt das Ergebnis zurück
$python manage.py makemigrations #Erstellen Sie eine Migrationsdatei (falls erforderlich)
$python manage.py migrate #Führen Sie die Migration aus
$python manage.py rumserver #Anwendungsausführung
# redis-server
$celery -A myapp worker --concurrency=1 -l info
[your_task_function] .delay ()
erfolgenfrom django_app.tasks import hello
from celery.result import AsyncResult
def hoge():
task = hello.delay(project_id) #Dies ist alles, was Sie zum Aufrufen der asynchronen Verarbeitung benötigen. Danach läuft der Prozess asynchron ab
print("===============")
print(task.id) #Sie können die eindeutig zugewiesene ID überprüfen.
--Überprüfen Sie den Aufgabenstatus anhand der im obigen Beispiel erhaltenen Aufgaben-ID.
--Verwenden Sie AsyncResult
, um den Status und das Ergebnis zu überprüfen
PENDING
: Warten auf Ausführung.
--STARTED
: Ausführungsstartstatus. (Es wird nicht standardmäßig ausgegeben. Die Ausgabemethode wird später beschrieben.)
--SUCCESS
: Die Ausführung wurde normal abgeschlossen.
--FAILED
: Die Ausführung wurde abnormal beendet.from django_app.tasks import hello
from celery.result import AsyncResult
def hoge():
task = hello.delay(project_id) #Dies ist alles, was Sie zum Aufrufen der asynchronen Verarbeitung benötigen. Danach läuft der Prozess asynchron ab
print("===============")
task_id = task.id
print(task_id) #Sie können die eindeutig zugewiesene ID überprüfen.
task_1 = AsyncResult(task_id)
print(task_1.status) #Überprüfen Sie den Status sofort nach Beginn der Verarbeitung
time.sleep(1)
print("===============")
task_2 = AsyncResult(task_id)
print(task_2.status) #Überprüfen Sie den Status 1 Sekunde nach der Verarbeitung
from django_app.tasks import hello
from django_celery_results.models import TaskResult
def hoge():
task = hello.delay(project_id) #Dies ist alles, was Sie zum Aufrufen der asynchronen Verarbeitung benötigen. Danach läuft der Prozess asynchron ab
print("===============")
task_id = task.id #Sie können die eindeutig zugewiesene ID überprüfen.
print(task_id)
task_model_1 = TaskResult.objects.filter(task_id=task_id)
if len(task_model_1) > 0:
print(task_model_1[0].status) #Überprüfen Sie den Status sofort nach Beginn der Verarbeitung
time.sleep(1)
task_model_2 = TaskResult.objects.filter(task_id=task_id)
if len(task_model_2) > 0:
print(task_model_2[0].status) #Überprüfen Sie den Status 1 Sekunde nach der Verarbeitung
#Inhalt von MySQL
##Die Tabelle ist wie folgt.
mysql> show tables;
+----------------------------------+
| Tables_in_nodeai |
+----------------------------------+
| auth_group |
| auth_group_permissions |
| auth_permission |
| django_admin_log |
| django_celery_results_taskresult | ## <-Dies wurde hinzugefügt!
...
## django_celery_results_Das Aufgabenergebnisschema lautet wie folgt.
mysql> desc django_celery_results_taskresult;
+------------------+--------------+------+-----+---------+----------------+
| Field | Type | Null | Key | Default | Extra |
+------------------+--------------+------+-----+---------+----------------+
| id | int(11) | NO | PRI | NULL | auto_increment |
| task_id | varchar(255) | NO | UNI | NULL | |
| status | varchar(50) | NO | MUL | NULL | |
| content_type | varchar(128) | NO | | NULL | |
| content_encoding | varchar(64) | NO | | NULL | |
| result | longtext | YES | | NULL | |
| date_done | datetime(6) | NO | MUL | NULL | |
| traceback | longtext | YES | | NULL | |
| meta | longtext | YES | | NULL | |
| task_args | longtext | YES | | NULL | |
| task_kwargs | longtext | YES | | NULL | |
| task_name | varchar(255) | YES | MUL | NULL | |
| worker | varchar(100) | YES | MUL | NULL | |
| date_created | datetime(6) | NO | MUL | NULL | |
+------------------+--------------+------+-----+---------+----------------+
--Wenn CELERY_TASK_TRACK_STARTED
in settings.py nicht angegeben ist, wird die Task ausgeführt, bis das Ergebnis erhalten wird.
settings.py
gesetztCELERY_TASK_TRACK_STARTED = True
task.result
gespeichert##Angerufene(=Task)
from __future__ import absolute_import, unicode_literals
from celery import shared_task
@shared_task
def hello(proj_id):
time.sleep(10) #10 Sekunden, um zu bestätigen, dass der Prozess asynchron beendet wird
message = "hello"
raise Exception("my error message") #Fehlerbenachrichtigung
#In MySQL gespeicherte Ergebnisse
mysql> select * from django_celery_results_taskresult\G
...
*************************** 31. row ***************************
id: 31
task_id: be294008-d2fc-4760-9055-483efdaa4970
status: FAILURE
content_type: application/json
content_encoding: utf-8
result: {"exc_type": "Exception", "exc_message": ["my error message"], "exc_module": "builtins"}
date_done: 2020-11-10 08:06:32.848782
traceback: Traceback (most recent call last):...
meta: {"children": []}
task_args: (4,)
task_kwargs: {}
task_name: myapp.tasks.task.hello
worker: celery@05ab2e4b5ee1
date_created: 2020-11-10 08:06:22.829301
Recommended Posts