[PYTHON] Nimm das Ausführungsprotokoll von Sellerie

Ursprung

Ich benutze oft die Kombination von Django + Redis (manchmal ElastiCashe) + Sellerie.

Ich mache es so.

Es ist einfach zu schreiben, aber wenn Sie wissen möchten, ob es geflossen ist, können Sie sich die Protokolldatei ansehen, wenn es nur einen Sellerie-Arbeiter gibt. Wenn es jedoch mehrere Sellerie-Arbeiter gibt, ist es schwierig herauszufinden, wo es geflossen ist.

Ich möchte wissen, was ich fragen soll und was schief gelaufen ist

Lassen Sie uns also das Ausführungsprotokoll von request-> success / fail im Modell von RDB löschen, damit es auf der Admin-Site bestätigt werden kann.

Selbst mit nur einem Sellerie-Arbeiter ist es natürlich einfacher, einer Protokolldatei zu folgen.

Ich habe bereits viele Sellerie-Aufgaben geschrieben, deshalb möchte ich es so einfach wie möglich machen.

Vererbungsquelle Aufgabe

Normalerweise beginnen Sie mit dem Schreiben von from Sellery import Task, aber schreiben wir eine Task, die die Fehlerbehandlung unterstützt.

Der Aufrufer verwendet apply_async (), das nach delay () aufgerufen wird, um die Anforderung zu protokollieren. Die aufgerufene Seite protokolliert on_success (), wenn es normal ist, und on_failure (), wenn es abnormal ist.

Um es zu verwenden, ändern Sie die Vererbungsquelle wie SampleTask in Task-> BaseHandlingTask.

Übrigens, wenn Sie delay () asynchron machen und in run () ändern, wird nichts aufgezeichnet, weil es synchron ist.

app/tasks/base/handling.py

import json
import logging
import socket
import sys
import traceback
from celery import Task
from django.db import transaction

from app.models import JobState


class BaseHandlingTask(Task):
    """Basisaufgabe zur Fehlerbehandlung"""
    logger = logging.getLogger('prj')

    def apply_async(self, args=None, kwargs=None, task_id=None, producer=None, link=None, link_error=None, **options):
        """Apply tasks asynchronously by sending a message."""
        async_result = None
        try:
            async_result = super().apply_async(args, kwargs, task_id, producer, link, link_error, **options)
        except:     #Redis ist nicht auf usw.
            #Befindet sich der Anrufer innerhalb des Atomblocks, wird er wieder zusammengesetzt. Verwenden Sie daher die zweite Verbindung
            with transaction.atomic(using='force'):
                exc_type, exc_value, exc_traceback = sys.exc_info()
                job_state = JobState()  # task_keine ID
                job_state.task_name = self.name
                job_state.name = self.name.split('.')[-1]
                if args:
                    job_state.args = json.dumps(list(args))
                if kwargs:
                    job_state.kwargs = json.dumps(kwargs)
                job_state.origin = socket.gethostname()
                job_state.exception_class = exc_value.__class__.__name__
                job_state.exception_msg = exc_value.args[0] if exc_value.args else exc_type.__module__ + '.' + exc_type.__name__
                job_state.traceback = traceback.format_exc()
                job_state.save(using='force')
            raise

        #Wenn der Start erfolgreich ist-Redis ist auf. Sellerie ist auf / nicht auf(is_complete ==Bleibt falsch)
        job_state, is_created = JobState.objects.get_or_create(task_id=async_result.id)
        job_state.task_name = async_result.task_name
        job_state.name = async_result.task_name.split('.')[-1]
        if args:
            job_state.args = json.dumps(list(args))
        if kwargs:
            job_state.kwargs = json.dumps(kwargs)
        job_state.origin = socket.gethostname()
        job_state.save()

        return async_result

    def on_success(self, retval, task_id, args, kwargs):
        """Success handler -Handler zur normalen Zeit-Auf der Seite der Sellerie-Arbeiter angerufen"""
        job_state, is_created = JobState.objects.get_or_create(task_id=task_id)
        job_state.task_name = self.name
        job_state.name = self.name.split('.')[-1]
        if args:
            job_state.args = json.dumps(list(args))
        if kwargs:
            job_state.kwargs = json.dumps(kwargs)
        if self.request.get('origin'):
            job_state.origin = self.request.get('origin')
        job_state.hostname = self.request.get('hostname')
        job_state.is_complete = True
        job_state.save()

    def on_failure(self, exc, task_id, args, kwargs, einfo):
        """Error handler -Handler bei Fehler-Auf der Seite der Sellerie-Arbeiter angerufen
            - run()Selbst innerhalb des Atomblocks von wird der Datensatz nicht zurückgesetzt, da er vom Worker separat aufgerufen wird.
        """
        job_state, is_created = JobState.objects.get_or_create(task_id=task_id)
        job_state.task_name = self.name
        job_state.name = self.name.split('.')[-1]
        if args:
            job_state.args = json.dumps(list(args))
        if kwargs:
            job_state.kwargs = json.dumps(kwargs)
        if self.request.get('origin'):
            job_state.origin = self.request.get('origin')
        job_state.hostname = self.request.get('hostname')
        job_state.exception_class = exc.__class__.__name__
        job_state.exception_msg = str(exc)
        job_state.traceback = str(einfo)
        job_state.save()

    def run(self, *args, **kwargs):
        """The body of the task executed by workers."""
        raise NotImplementedError('Tasks must define the run method.')


class SampleTask(BaseHandlingTask):
    """Beispielaufgabe"""
    logger = logging.getLogger('prj')

    def run(self, is_error=False):
        self.logger.info('SampleTask start...')
        with transaction.atomic():
            if is_error:
                raise ValueError('Es ist ein Fehler')
        self.logger.info('SampleTask end.')

JobState-Modell

Dies ist das Modell, an das das Protokoll ausgegeben wird.

Du kannst sehen.

app/models/job_state.py

from django.db import models


class JobState(models.Model):
    """Beruflicher Status"""
    task_id = models.CharField('Aufgaben-ID', max_length=255, blank=True, null=True, db_index=True)   # UUID
    task_name = models.CharField('Aufgabennname', max_length=255, blank=True, null=True)     #Beispiel: app.tasks.handling.SampleTask
    name = models.CharField('Name der Klasse', max_length=255, blank=True, null=True)          #Beispiel: SampleTask
    args = models.TextField('args', null=True, blank=True)
    kwargs = models.TextField('kwargs', null=True, blank=True)
    is_complete = models.BooleanField('Erledigt', default=False)
    origin = models.CharField('origin', max_length=255, blank=True, null=True)  # Name of host that sent this task.
    hostname = models.CharField('hostname', max_length=255, blank=True, null=True)  # Node name of the worker instance executing the task.
    exception_class = models.CharField('Ausnahmeklasse', max_length=255, null=True, blank=True, default='')
    exception_msg = models.CharField('Ausnahmemeldung', max_length=255, null=True, blank=True, default='')
    traceback = models.TextField('traceback', null=True, blank=True, default='')

    created_at = models.DateTimeField('Eingetragenes Datum', auto_now_add=True, blank=True, null=True)
    updated_at = models.DateTimeField('Datum und Uhrzeit aktualisieren', auto_now=True, blank=True, null=True)

    def __str__(self):
        return self.task_id if self.task_id else str(self.id)

Registrierung der Admin-Site

Machen Sie JobState auf der Admin-Site sichtbar. Wenn es so belassen wird, wie es ist, wird es länger, so dass es an einigen Stellen verkürzt werden kann.

app/admin.py

from django.contrib import admin
from app.models import JobState

class JobStateAdmin(admin.ModelAdmin):
    """Beruflicher Status"""
    list_display = ('id', 'task_id_shorten', 'name', 'x_args', 'x_kwargs', 'is_complete', 'exception_class', 'origin', 'hostname', 'created_at')
    list_display_links = ('id', 'task_id_shorten', 'name')
    list_filter = ('is_complete',)
    search_fields = ['task_id', 'task_name', 'name']

    def task_id_shorten(self, obj):
        return obj.task_id[:8] + '...' if obj.task_id else ''
    task_id_shorten.short_description = 'Aufgaben-ID'

    def x_args(self, obj):
        return obj.args[:20] + '...' if obj.args and len(obj.args) > 20 else obj.args
    x_args.short_description = 'args'

    def x_kwargs(self, obj):
        return obj.kwargs[:20] + '...' if obj.kwargs and len(obj.kwargs) > 20 else obj.kwargs
    x_kwargs.short_description = 'kwargs'

admin.site.register(JobState, JobStateAdmin)

settings

Wenn der Aufrufer transaction.atomic () ist, wird JobState im Falle eines Fehlers ebenfalls zurückgesetzt. Dies ist ein bisschen brutal, aber wir erhöhen eine weitere Datenbankverbindung für erzwungene Schreibvorgänge.

Wenn Sie nicht so weit gehen müssen, können Sie es tun.

Fügen Sie unter DATABASE = {} Folgendes hinzu

prj/settings/local.py

#Stellen Sie eine zweite Verbindung mit denselben Einstellungen her, damit Sie in JobState schreiben können, auch wenn Sie im atmic-Block eine Ausnahme machen
DATABASES.update({'force': DATABASES['default']})

das ist alles

Es ist ein grober Code, aber jetzt können Sie ihn auf einen Blick sehen.

Wenn es zu viel ist, um die normale Zeit anzugeben, kommentieren Sie den Ort aus, der in der normalen Zeit ausgegeben wird.

Da zu viele Cron-Jobs (Sellerie-Beats) in 15-Minuten-Einheiten ausgeführt werden, wird die BaseHandlingTask nicht an erster Stelle vererbt, sondern die einfache Aufgabe.

Bitte anpassen mit etc.

Recommended Posts

Nimm das Ausführungsprotokoll von Sellerie
Einstellung zur Ausgabe des Protokolls zur Ausführung von cron
Nehmen Sie die logarithmische Darstellung von Nicht-Null-Elementen in scipy.sparse
Erstellen Sie eine Ausführungsumgebung für Jupyter Lab
Visualisieren Sie die Exportdaten des Piyo-Protokolls
Bereiten Sie die Ausführungsumgebung von Python3 mit Docker vor
Die Ungenauigkeit von Tensorflow war auf log (0) zurückzuführen.
Von der Einführung von Pyethapp bis zur Vertragsabwicklung
Der Beginn von cif2cell
Ich möchte das Ausführungsergebnis von strace erfassen
Die Bedeutung des Selbst
Werfen Sie einen Blick auf die Verarbeitung von LightGBM Tuner
Machen Sie LCD-Screenshots mit Python-LEGO Mindstorms
der Zen von Python
Nehmen Sie den Wert des SwitchBot-Thermo-Hygrometers mit Raspberry Pi
Die Geschichte von sys.path.append ()
Umschalten der Bot-Thermo-Hygrometer-Werte mit Raspberry Pi
Vorbereiten der Ausführungsumgebung von PyTorch mit Docker November 2019
Reproduzieren Sie das Ausführungsbeispiel von Kapitel 4 von Hajipata in Python
Messung der Ausführungszeit
Reproduzieren Sie das Ausführungsbeispiel von Kapitel 5 von Hajipata in Python
Rache der Typen: Rache der Typen
Lassen Sie uns das Ausführungsergebnis des Programms mit C ++, Java, Python messen.
Ändern des Aufbewahrungszeitraums für CloudWatch-Protokolle in Lambda
[Linux] [Kernelmodul] Geben Sie die Ausführungs-CPU von kthread an / begrenzen Sie sie
Richten Sie die Version von chromedriver_binary aus
Scraping das Ergebnis von "Schedule-Kun"
10. Zählen der Anzahl der Zeilen
Die Geschichte des Baus von Zabbix 4.4
Auf dem Weg zum Ruhestand von Python2
Vergleichen Sie die Schriftarten von Jupyter-Themen
Holen Sie sich die Anzahl der Ziffern
Erläutern Sie den Code von Tensorflow_in_ROS
Verwenden Sie die Clustering-Ergebnisse erneut
GoPiGo3 des alten Mannes
Berechnen Sie die Anzahl der Änderungen
Ändern Sie das Thema von Jupyter
Die Popularität von Programmiersprachen
Ändern Sie den Stil von matplotlib
Visualisieren Sie die Flugbahn von Hayabusa 2
Über die Komponenten von Luigi
Verknüpfte Komponenten des Diagramms
Filtern Sie die Ausgabe von tracemalloc
Über die Funktionen von Python
Simulation des Inhalts der Brieftasche
Die Kraft der Pandas: Python
Ich möchte die Ausführungszeit aufzeichnen und ein Protokoll führen.
So ändern Sie die Protokollstufe von Azure SDK für Python
Werfen wir einen Blick auf die Feature-Map von YOLO v3
Ablauf des Ergebnisses der asynchronen Verarbeitung mit Django und Sellerie
[Hikari-Python] Kapitel 07-02 Ausnahmebehandlung (Kontinuierliche Ausführung des Programms durch Ausnahmebehandlung)
[Python3] Definition eines Dekorators, der die Ausführungszeit einer Funktion misst
So überwachen Sie den Ausführungsstatus von sqlldr mit dem Befehl pv
Informationen zur SystemChannels-API, um die plattformspezifischen Funktionen von Flutter zu nutzen
Protokollieren Sie die Omron-Umgebungssensorwerte regelmäßig mit Raspberry Pi