Ich benutze oft die Kombination von Django + Redis (manchmal ElastiCashe) + Sellerie.
Ich mache es so.
Es ist einfach zu schreiben, aber wenn Sie wissen möchten, ob es geflossen ist, können Sie sich die Protokolldatei ansehen, wenn es nur einen Sellerie-Arbeiter gibt. Wenn es jedoch mehrere Sellerie-Arbeiter gibt, ist es schwierig herauszufinden, wo es geflossen ist.
Lassen Sie uns also das Ausführungsprotokoll von request-> success / fail im Modell von RDB löschen, damit es auf der Admin-Site bestätigt werden kann.
Selbst mit nur einem Sellerie-Arbeiter ist es natürlich einfacher, einer Protokolldatei zu folgen.
Ich habe bereits viele Sellerie-Aufgaben geschrieben, deshalb möchte ich es so einfach wie möglich machen.
Normalerweise beginnen Sie mit dem Schreiben von from Sellery import Task
, aber schreiben wir eine Task, die die Fehlerbehandlung unterstützt.
Der Aufrufer verwendet apply_async (), das nach delay () aufgerufen wird, um die Anforderung zu protokollieren. Die aufgerufene Seite protokolliert on_success (), wenn es normal ist, und on_failure (), wenn es abnormal ist.
Um es zu verwenden, ändern Sie die Vererbungsquelle wie SampleTask in Task-> BaseHandlingTask.
Übrigens, wenn Sie delay () asynchron machen und in run () ändern, wird nichts aufgezeichnet, weil es synchron ist.
app/tasks/base/handling.py
import json
import logging
import socket
import sys
import traceback
from celery import Task
from django.db import transaction
from app.models import JobState
class BaseHandlingTask(Task):
"""Basisaufgabe zur Fehlerbehandlung"""
logger = logging.getLogger('prj')
def apply_async(self, args=None, kwargs=None, task_id=None, producer=None, link=None, link_error=None, **options):
"""Apply tasks asynchronously by sending a message."""
async_result = None
try:
async_result = super().apply_async(args, kwargs, task_id, producer, link, link_error, **options)
except: #Redis ist nicht auf usw.
#Befindet sich der Anrufer innerhalb des Atomblocks, wird er wieder zusammengesetzt. Verwenden Sie daher die zweite Verbindung
with transaction.atomic(using='force'):
exc_type, exc_value, exc_traceback = sys.exc_info()
job_state = JobState() # task_keine ID
job_state.task_name = self.name
job_state.name = self.name.split('.')[-1]
if args:
job_state.args = json.dumps(list(args))
if kwargs:
job_state.kwargs = json.dumps(kwargs)
job_state.origin = socket.gethostname()
job_state.exception_class = exc_value.__class__.__name__
job_state.exception_msg = exc_value.args[0] if exc_value.args else exc_type.__module__ + '.' + exc_type.__name__
job_state.traceback = traceback.format_exc()
job_state.save(using='force')
raise
#Wenn der Start erfolgreich ist-Redis ist auf. Sellerie ist auf / nicht auf(is_complete ==Bleibt falsch)
job_state, is_created = JobState.objects.get_or_create(task_id=async_result.id)
job_state.task_name = async_result.task_name
job_state.name = async_result.task_name.split('.')[-1]
if args:
job_state.args = json.dumps(list(args))
if kwargs:
job_state.kwargs = json.dumps(kwargs)
job_state.origin = socket.gethostname()
job_state.save()
return async_result
def on_success(self, retval, task_id, args, kwargs):
"""Success handler -Handler zur normalen Zeit-Auf der Seite der Sellerie-Arbeiter angerufen"""
job_state, is_created = JobState.objects.get_or_create(task_id=task_id)
job_state.task_name = self.name
job_state.name = self.name.split('.')[-1]
if args:
job_state.args = json.dumps(list(args))
if kwargs:
job_state.kwargs = json.dumps(kwargs)
if self.request.get('origin'):
job_state.origin = self.request.get('origin')
job_state.hostname = self.request.get('hostname')
job_state.is_complete = True
job_state.save()
def on_failure(self, exc, task_id, args, kwargs, einfo):
"""Error handler -Handler bei Fehler-Auf der Seite der Sellerie-Arbeiter angerufen
- run()Selbst innerhalb des Atomblocks von wird der Datensatz nicht zurückgesetzt, da er vom Worker separat aufgerufen wird.
"""
job_state, is_created = JobState.objects.get_or_create(task_id=task_id)
job_state.task_name = self.name
job_state.name = self.name.split('.')[-1]
if args:
job_state.args = json.dumps(list(args))
if kwargs:
job_state.kwargs = json.dumps(kwargs)
if self.request.get('origin'):
job_state.origin = self.request.get('origin')
job_state.hostname = self.request.get('hostname')
job_state.exception_class = exc.__class__.__name__
job_state.exception_msg = str(exc)
job_state.traceback = str(einfo)
job_state.save()
def run(self, *args, **kwargs):
"""The body of the task executed by workers."""
raise NotImplementedError('Tasks must define the run method.')
class SampleTask(BaseHandlingTask):
"""Beispielaufgabe"""
logger = logging.getLogger('prj')
def run(self, is_error=False):
self.logger.info('SampleTask start...')
with transaction.atomic():
if is_error:
raise ValueError('Es ist ein Fehler')
self.logger.info('SampleTask end.')
Dies ist das Modell, an das das Protokoll ausgegeben wird.
Du kannst sehen.
app/models/job_state.py
from django.db import models
class JobState(models.Model):
"""Beruflicher Status"""
task_id = models.CharField('Aufgaben-ID', max_length=255, blank=True, null=True, db_index=True) # UUID
task_name = models.CharField('Aufgabennname', max_length=255, blank=True, null=True) #Beispiel: app.tasks.handling.SampleTask
name = models.CharField('Name der Klasse', max_length=255, blank=True, null=True) #Beispiel: SampleTask
args = models.TextField('args', null=True, blank=True)
kwargs = models.TextField('kwargs', null=True, blank=True)
is_complete = models.BooleanField('Erledigt', default=False)
origin = models.CharField('origin', max_length=255, blank=True, null=True) # Name of host that sent this task.
hostname = models.CharField('hostname', max_length=255, blank=True, null=True) # Node name of the worker instance executing the task.
exception_class = models.CharField('Ausnahmeklasse', max_length=255, null=True, blank=True, default='')
exception_msg = models.CharField('Ausnahmemeldung', max_length=255, null=True, blank=True, default='')
traceback = models.TextField('traceback', null=True, blank=True, default='')
created_at = models.DateTimeField('Eingetragenes Datum', auto_now_add=True, blank=True, null=True)
updated_at = models.DateTimeField('Datum und Uhrzeit aktualisieren', auto_now=True, blank=True, null=True)
def __str__(self):
return self.task_id if self.task_id else str(self.id)
Machen Sie JobState auf der Admin-Site sichtbar. Wenn es so belassen wird, wie es ist, wird es länger, so dass es an einigen Stellen verkürzt werden kann.
app/admin.py
from django.contrib import admin
from app.models import JobState
class JobStateAdmin(admin.ModelAdmin):
"""Beruflicher Status"""
list_display = ('id', 'task_id_shorten', 'name', 'x_args', 'x_kwargs', 'is_complete', 'exception_class', 'origin', 'hostname', 'created_at')
list_display_links = ('id', 'task_id_shorten', 'name')
list_filter = ('is_complete',)
search_fields = ['task_id', 'task_name', 'name']
def task_id_shorten(self, obj):
return obj.task_id[:8] + '...' if obj.task_id else ''
task_id_shorten.short_description = 'Aufgaben-ID'
def x_args(self, obj):
return obj.args[:20] + '...' if obj.args and len(obj.args) > 20 else obj.args
x_args.short_description = 'args'
def x_kwargs(self, obj):
return obj.kwargs[:20] + '...' if obj.kwargs and len(obj.kwargs) > 20 else obj.kwargs
x_kwargs.short_description = 'kwargs'
admin.site.register(JobState, JobStateAdmin)
settings
Wenn der Aufrufer transaction.atomic () ist, wird JobState im Falle eines Fehlers ebenfalls zurückgesetzt. Dies ist ein bisschen brutal, aber wir erhöhen eine weitere Datenbankverbindung für erzwungene Schreibvorgänge.
Wenn Sie nicht so weit gehen müssen, können Sie es tun.
Fügen Sie unter DATABASE = {} Folgendes hinzu
prj/settings/local.py
#Stellen Sie eine zweite Verbindung mit denselben Einstellungen her, damit Sie in JobState schreiben können, auch wenn Sie im atmic-Block eine Ausnahme machen
DATABASES.update({'force': DATABASES['default']})
Es ist ein grober Code, aber jetzt können Sie ihn auf einen Blick sehen.
Wenn es zu viel ist, um die normale Zeit anzugeben, kommentieren Sie den Ort aus, der in der normalen Zeit ausgegeben wird.
Da zu viele Cron-Jobs (Sellerie-Beats) in 15-Minuten-Einheiten ausgeführt werden, wird die BaseHandlingTask nicht an erster Stelle vererbt, sondern die einfache Aufgabe.
Bitte anpassen mit etc.
Recommended Posts