[PYTHON] Prenez le journal d'exécution du céleri

Origine

J'utilise souvent la combinaison de Django + Redis (parfois ElastiCashe) + Celery.

Je fais ça comme ça.

C'est facile à écrire, mais si vous voulez savoir si cela a coulé, vous pouvez consulter le fichier journal s'il n'y a qu'un seul ouvrier Celery, mais s'il y a plusieurs ouvriers Celery, il est difficile de savoir où il s'est écoulé.

Je veux savoir quoi demander et ce qui ne va pas

Alors, sortons le journal d'exécution de request-> success / échec dans le modèle de RDB afin qu'il puisse être confirmé sur le site d'administration.

Bien sûr, même avec un seul ouvrier Celery, c'est plus facile que de suivre un fichier journal.

J'ai déjà écrit beaucoup de tâches de céleri, donc je veux le rendre aussi simple que possible.

Tâche source d'héritage

Normalement, vous commencez à écrire avec from celery import Task, mais écrivons une tâche qui prend en charge la gestion des erreurs.

L'appelant utilise apply_async (), qui est appelée après delay (), pour enregistrer la requête. Le côté appelé enregistre on_success () quand il est normal et on_failure () quand il est anormal.

Pour l'utiliser, changez la source d'héritage en Task-> BaseHandlingTask comme SampleTask.

Au fait, si vous créez delay () de manière asynchrone et que vous le changez en run (), rien ne sera enregistré car il est synchrone.

app/tasks/base/handling.py

import json
import logging
import socket
import sys
import traceback
from celery import Task
from django.db import transaction

from app.models import JobState


class BaseHandlingTask(Task):
    """Tâche de base pour la gestion des erreurs"""
    logger = logging.getLogger('prj')

    def apply_async(self, args=None, kwargs=None, task_id=None, producer=None, link=None, link_error=None, **options):
        """Apply tasks asynchronously by sending a message."""
        async_result = None
        try:
            async_result = super().apply_async(args, kwargs, task_id, producer, link, link_error, **options)
        except:     #Redis n'est pas en place, etc.
            #Si l'appelant est à l'intérieur du bloc atomique, il sera restauré ensemble, utilisez donc la deuxième connexion
            with transaction.atomic(using='force'):
                exc_type, exc_value, exc_traceback = sys.exc_info()
                job_state = JobState()  # task_pas d'identifiant
                job_state.task_name = self.name
                job_state.name = self.name.split('.')[-1]
                if args:
                    job_state.args = json.dumps(list(args))
                if kwargs:
                    job_state.kwargs = json.dumps(kwargs)
                job_state.origin = socket.gethostname()
                job_state.exception_class = exc_value.__class__.__name__
                job_state.exception_msg = exc_value.args[0] if exc_value.args else exc_type.__module__ + '.' + exc_type.__name__
                job_state.traceback = traceback.format_exc()
                job_state.save(using='force')
            raise

        #Lorsque le démarrage est réussi-Redis est en place. Le céleri est levé / pas levé(is_complete ==Reste faux)
        job_state, is_created = JobState.objects.get_or_create(task_id=async_result.id)
        job_state.task_name = async_result.task_name
        job_state.name = async_result.task_name.split('.')[-1]
        if args:
            job_state.args = json.dumps(list(args))
        if kwargs:
            job_state.kwargs = json.dumps(kwargs)
        job_state.origin = socket.gethostname()
        job_state.save()

        return async_result

    def on_success(self, retval, task_id, args, kwargs):
        """Success handler -Manutentionnaire à l'heure normale-Appelé du côté ouvrier du céleri"""
        job_state, is_created = JobState.objects.get_or_create(task_id=task_id)
        job_state.task_name = self.name
        job_state.name = self.name.split('.')[-1]
        if args:
            job_state.args = json.dumps(list(args))
        if kwargs:
            job_state.kwargs = json.dumps(kwargs)
        if self.request.get('origin'):
            job_state.origin = self.request.get('origin')
        job_state.hostname = self.request.get('hostname')
        job_state.is_complete = True
        job_state.save()

    def on_failure(self, exc, task_id, args, kwargs, einfo):
        """Error handler -Gestionnaire en cas d'erreur-Appelé du côté ouvrier du céleri
            - run()Même à l'intérieur du bloc atomique de, l'enregistrement n'est pas annulé car il est appelé séparément par le worker.
        """
        job_state, is_created = JobState.objects.get_or_create(task_id=task_id)
        job_state.task_name = self.name
        job_state.name = self.name.split('.')[-1]
        if args:
            job_state.args = json.dumps(list(args))
        if kwargs:
            job_state.kwargs = json.dumps(kwargs)
        if self.request.get('origin'):
            job_state.origin = self.request.get('origin')
        job_state.hostname = self.request.get('hostname')
        job_state.exception_class = exc.__class__.__name__
        job_state.exception_msg = str(exc)
        job_state.traceback = str(einfo)
        job_state.save()

    def run(self, *args, **kwargs):
        """The body of the task executed by workers."""
        raise NotImplementedError('Tasks must define the run method.')


class SampleTask(BaseHandlingTask):
    """Exemple de tâche"""
    logger = logging.getLogger('prj')

    def run(self, is_error=False):
        self.logger.info('SampleTask start...')
        with transaction.atomic():
            if is_error:
                raise ValueError('C'est une erreur')
        self.logger.info('SampleTask end.')

Modèle JobState

Il s'agit du modèle vers lequel le journal est généré.

Tu peux voir.

app/models/job_state.py

from django.db import models


class JobState(models.Model):
    """Statut de l'emploi"""
    task_id = models.CharField('ID de tâche', max_length=255, blank=True, null=True, db_index=True)   # UUID
    task_name = models.CharField('Nom de la tâche', max_length=255, blank=True, null=True)     #Exemple: app.tasks.handling.SampleTask
    name = models.CharField('nom de la classe', max_length=255, blank=True, null=True)          #Exemple: SampleTask
    args = models.TextField('args', null=True, blank=True)
    kwargs = models.TextField('kwargs', null=True, blank=True)
    is_complete = models.BooleanField('Terminé', default=False)
    origin = models.CharField('origin', max_length=255, blank=True, null=True)  # Name of host that sent this task.
    hostname = models.CharField('hostname', max_length=255, blank=True, null=True)  # Node name of the worker instance executing the task.
    exception_class = models.CharField('Classe d'exception', max_length=255, null=True, blank=True, default='')
    exception_msg = models.CharField('Message d'exception', max_length=255, null=True, blank=True, default='')
    traceback = models.TextField('traceback', null=True, blank=True, default='')

    created_at = models.DateTimeField('Date d'enregistrement', auto_now_add=True, blank=True, null=True)
    updated_at = models.DateTimeField('Mettre à jour la date et l'heure', auto_now=True, blank=True, null=True)

    def __str__(self):
        return self.task_id if self.task_id else str(self.id)

inscription au site d'administration

Rendez JobState visible sur le site d'administration. S'il est laissé tel quel, il sera plus long, il peut donc être raccourci à certains endroits.

app/admin.py

from django.contrib import admin
from app.models import JobState

class JobStateAdmin(admin.ModelAdmin):
    """Statut de l'emploi"""
    list_display = ('id', 'task_id_shorten', 'name', 'x_args', 'x_kwargs', 'is_complete', 'exception_class', 'origin', 'hostname', 'created_at')
    list_display_links = ('id', 'task_id_shorten', 'name')
    list_filter = ('is_complete',)
    search_fields = ['task_id', 'task_name', 'name']

    def task_id_shorten(self, obj):
        return obj.task_id[:8] + '...' if obj.task_id else ''
    task_id_shorten.short_description = 'ID de tâche'

    def x_args(self, obj):
        return obj.args[:20] + '...' if obj.args and len(obj.args) > 20 else obj.args
    x_args.short_description = 'args'

    def x_kwargs(self, obj):
        return obj.kwargs[:20] + '...' if obj.kwargs and len(obj.kwargs) > 20 else obj.kwargs
    x_kwargs.short_description = 'kwargs'

admin.site.register(JobState, JobStateAdmin)

settings

Si l'appelant est transaction.atomic (), JobState sera également annulé en cas d'erreur, donc c'est un peu la force brute, mais nous augmentons une autre connexion à la base de données pour les écritures forcées.

Si vous n'êtes pas obligé d'aller aussi loin, vous pouvez le faire.

Ajoutez ce qui suit sous DATABASE = {}

prj/settings/local.py

#Établissez une deuxième connexion avec les mêmes paramètres afin de pouvoir écrire dans JobState même si vous faites une exception dans le bloc atmic
DATABASES.update({'force': DATABASES['default']})

c'est tout

C'est un code approximatif, mais maintenant vous pouvez le voir en un coup d'œil.

Si c'est trop pour inclure le temps normal, commentez l'endroit qui est mis dehors dans le temps normal.

De plus, comme il y a trop de tâches cron (battements de céleri) qui s'écoulent toutes les 15 minutes, n'héritez pas de BaseHandlingTask en premier lieu, mais héritez de la tâche simple.

Veuillez ajuster avec etc.

Recommended Posts

Prenez le journal d'exécution du céleri
Paramètre pour afficher le journal de l'exécution de cron
Prenez la logarithmique des éléments non nuls dans scipy.sparse
Créer un environnement d'exécution pour Jupyter Lab
Visualisez les données d'exportation du journal Piyo
Préparer l'environnement d'exécution de Python3 avec Docker
L'inexactitude de Tensorflow était due à log (0)
De l'introduction de pyethapp à l'exécution du contrat
Le début de cif2cell
Je veux grep le résultat de l'exécution de strace
Le sens de soi
Jetez un œil au traitement de LightGBM Tuner
Prenez des captures d'écran LCD avec Python-LEGO Mindstorms
le zen de Python
Prenez la valeur du thermo-hygromètre SwitchBot avec Raspberry Pi
L'histoire de sys.path.append ()
Changer les valeurs du thermo-hygromètre Bot avec Raspberry Pi
Préparation de l'environnement d'exécution de PyTorch avec Docker Novembre 2019
Reproduire l'exemple d'exécution du chapitre 4 de Hajipata en Python
Mesure du temps d'exécution
Reproduire l'exemple d'exécution du chapitre 5 de Hajipata en Python
La vengeance des types: la vengeance des types
Mesurons le résultat de l'exécution du programme avec C ++, Java, Python.
Modifier la période de conservation des journaux CloudWatch Logs dans Lambda
[Linux] [module du noyau] Spécifier / limiter le CPU d'exécution de kthread
Aligner la version de chromedriver_binary
Grattage du résultat de "Schedule-kun"
10. Compter le nombre de lignes
L'histoire de la construction de Zabbix 4.4
Vers la retraite de Python2
Comparez les polices de jupyter-themes
Obtenez le nombre de chiffres
Expliquez le code de Tensorflow_in_ROS
Réutiliser les résultats du clustering
GoPiGo3 du vieil homme
Calculez le nombre de changements
Changer le thème de Jupyter
La popularité des langages de programmation
Changer le style de matplotlib
Visualisez la trajectoire de Hayabusa 2
À propos des composants de Luigi
Composants liés du graphique
Filtrer la sortie de tracemalloc
À propos des fonctionnalités de Python
Simulation du contenu du portefeuille
Le pouvoir des pandas: Python
Je souhaite enregistrer l'heure d'exécution et conserver un journal.
Comment modifier le niveau de journalisation d'Azure SDK pour Python
Jetons un coup d'œil à la carte des fonctionnalités de YOLO v3
Flux d'obtention du résultat du traitement asynchrone à l'aide de Django et Celery
[Hikari-Python] Chapitre 07-02 Gestion des exceptions (exécution continue du programme par gestion des exceptions)
[Python3] Définition d'un décorateur qui mesure le temps d'exécution d'une fonction
Comment surveiller l'état d'exécution de sqlldr avec la commande pv
À propos de l'API SystemChannels pour tirer parti des fonctionnalités spécifiques à la plate-forme Flutter
Consigner périodiquement les valeurs des capteurs d'environnement Omron avec Raspberry Pi