[PYTHON] Conception de flux de travail Celery 3.1

En lisant le document sur le céleri, qu'y a-t-il? Document officiel et à mâcher vous-même.

The primitives

Map & Starmap, Chunks n'ont pas été examinés.

Chains

Exécutez les tâches en série. Les tâches suivantes reçoivent le résultat de l'exécution de la tâche précédente.

** Faites attention à la signature de chaque tâche. ** **

from celery import chain

# `add.s(4, 4)`Le résultat de`mul.s(8)`Passez à.そLe résultat de`mul.s(10)`Passez à.
chain(add.s(4, 4), mul.s(8), mul.s(10))

Groups

Exécutez plusieurs tâches en parallèle.

from celery import group

# `task.s(2, 2)`Quand`task.s(4, 4)`Est exécuté en parallèle
group(
	task.s(2, 2),
	task.s(4, 4)
)

Chords

Les résultats de l'exécution de plusieurs tâches peuvent être transmis au rappel.

from celery import chord

#Résultats d'exécution multiples Tsum.s()Passer au
chord(add.s(i, i) for i in xrange(100))(tsum.s()).get()

Dans cet exemple, ʻadd.s (i, i) pour i dans xrange (100) est une tâche à exécuter en parallèle, et le résultat de l'exécution (liste?) Est passé au rappel tsum.s () `.

combinaison

1 pre-processor + n post-processors

Je voulais faire ça et j'ai cherché!

Cela ressemble à une échelle de temps.

kobito.1423119852.301252.png

Assumer des tâches telles que l'exécution de la sauvegarde d'image comme prétraitement, le traitement d'image comme post-traitement, le redimensionnement, etc. en même temps.

@task
def main_task(object_id):
    #Tâches de prétraitement
    
    #Beau traitement
    
    #Objet pour les tâches suivantes_Renvoie l'ID
    return object_id
    
@task
def sub_task_1(object_id):
    #Tâches de post-traitement
    pass

@task
def sub_task_2(object_id):
    #Tâches de post-traitement
    pass


#Construisez une chaîne de tâches entières. chaîne()Et groupe.
chains = chain(
    main_task.s(object.pk),
    group(
    	sub_task_1.s(),
    	sub_task_2.s()
    )
)

#Exécutez la chaîne
# main_Une fois l'exécution de la tâche terminée, sous_task_1, sub_task_2 est exécuté en parallèle.
chains.apply_async()

Le but est de faire correspondre les signatures reçues par les tâches suivantes dans le groupe.

1 processor + mapped post processors

Exécutez plusieurs sorties du prétraitement et exécutez plusieurs tâches suivantes en parallèle. Similaire au flux précédent, sauf que le pré-processus génère plusieurs résultats (nombre indéfini).

kobito.1423120577.848210.png

@task
def pre_process_task(object_id):
    #Tâches de prétraitement

    #Beau traitement

    #Renvoie une liste d'objets qui seront traités
	return [1, 2, 3, 4, 5 ...]

@task
def post_process_task(object_id):
    #Tâches de post-traitement
    #Conception pour recevoir des objets individuels
    pass


@task
def dmap(it, callback):
    #Recevez la liste et transmettez-la à callbak
    callback = subtask(callback)
    return group(callback.clone([arg,]) for arg in it)()


#Construisez une chaîne de tâches entières
chains = chain(
    pre_process_task.s(object.pk),
    dmap.s(post_process_task.s()
)

#Exécutez la chaîne
# pre_process_publier une fois l'exécution de la tâche terminée_process_processus de tâches en parallèle
chains.apply_async()

Completed task

Si vous utilisez si (), cela devient une tâche immuable, vous pouvez donc exécuter la tâche en ignorant la valeur de retour de la tâche précédente.

Kobito.v0wo8Y.png

@task
def main_task(object_id):
    #Une tâche
    return (object_id, result)
    
@task
def sub_task_1(args):
    #Une tâche
    object_id, result = args
    return True
    
@task
def sub_task_2(args):
    #Une tâche
    object_id, result = args
    return True
    
@task
def finalize_task(object_id):
    #Journal d'achèvement de la tâche de sortie
    logger.info('Task completed')
    return True
    

object_id = 123

chain(
    main_task.s(object_id),
    group(
    	sub_task_1.s(),  # main_Utiliser la valeur de retour de la tâche
    	sub_task_2.s()   # main_Utiliser la valeur de retour de la tâche
    ),
    main_completed_task.si(object_id)       # s()Pas si()Notez que
).apply_async()

référence

Recommended Posts

Conception de flux de travail Celery 3.1
céleri
[Python] Conception d'applications Web pour l'apprentissage automatique