[PYTHON] Sellerie 3.1 Workflow-Design

Was passiert beim Lesen des Sellerie-Dokuments? Kauen Sie sich beim Lesen des offiziellen Dokuments.

The primitives

Map & Starmap, Chunks wurden nicht untersucht.

Chains

Führen Sie Aufgaben in Serie aus. Nachfolgende Aufgaben erhalten das Ausführungsergebnis der vorherigen Aufgabe.

** Achten Sie auf die Unterschrift jeder Aufgabe. ** ** **

from celery import chain

# `add.s(4, 4)`Das Ergebnis von`mul.s(8)`Überqueren Sie zu.そDas Ergebnis von`mul.s(10)`Überqueren Sie zu.
chain(add.s(4, 4), mul.s(8), mul.s(10))

Groups

Führen Sie mehrere Aufgaben gleichzeitig aus.

from celery import group

# `task.s(2, 2)`Wann`task.s(4, 4)`Wird parallel ausgeführt
group(
	task.s(2, 2),
	task.s(4, 4)
)

Chords

Die Ausführungsergebnisse mehrerer Aufgaben können an den Rückruf übergeben werden.

from celery import chord

#Tsum mehrere Ausführungsergebnisse.s()Übergeben an
chord(add.s(i, i) for i in xrange(100))(tsum.s()).get()

In diesem Beispiel ist "add.s (i, i) für i in xrange (100)" eine Aufgabe, die parallel ausgeführt werden soll, und das Ausführungsergebnis (Liste?) Wird an den Rückruf "tsum.s ()" übergeben.

Kombination

1 pre-processor + n post-processors

Ich wollte das machen und habe es nachgeschlagen!

Es sieht so aus als Zeitskala.

kobito.1423119852.301252.png

Übernehmen Sie Aufgaben wie das gleichzeitige Ausführen des Speicherns von Bildern als Vorverarbeitung, das Verarbeiten von Bildern als Nachbearbeitung, das Ändern der Größe usw.

@task
def main_task(object_id):
    #Vorverarbeitungsaufgaben
    
    #Gute Verarbeitung
    
    #Objekt für nachfolgende Aufgaben_Gibt die ID zurück
    return object_id
    
@task
def sub_task_1(object_id):
    #Nachbearbeitungsaufgaben
    pass

@task
def sub_task_2(object_id):
    #Nachbearbeitungsaufgaben
    pass


#Bauen Sie eine Kette ganzer Aufgaben auf. Kette()Und Gruppe.
chains = chain(
    main_task.s(object.pk),
    group(
    	sub_task_1.s(),
    	sub_task_2.s()
    )
)

#Führen Sie die Kette aus
# main_Nachdem die Ausführung der Aufgabe abgeschlossen ist, wird sub_task_1, sub_task_2 wird parallel ausgeführt.
chains.apply_async()

Es geht darum, die Signaturen abzugleichen, die von nachfolgenden Aufgaben in der Gruppe empfangen wurden.

1 processor + mapped post processors

Führen Sie mehrere Ausgaben der Vorverarbeitung durch und führen Sie mehrere nachfolgende Aufgaben parallel aus. Ähnlich wie beim vorherigen Ablauf, außer dass der Vorprozess mehrere Ergebnisse ausgibt (unbestimmte Anzahl).

kobito.1423120577.848210.png

@task
def pre_process_task(object_id):
    #Vorverarbeitungsaufgaben

    #Gute Verarbeitung

    #
	return [1, 2, 3, 4, 5 ...]

@task
def post_process_task(object_id):
    #
    #
    pass


@task
def dmap(it, callback):
    #
    callback = subtask(callback)
    return group(callback.clone([arg,]) for arg in it)()


#
chains = chain(
    pre_process_task.s(object.pk),
    dmap.s(post_process_task.s()
)

#
# pre_process_Nachbearbeitungsaufgaben Entwerfen zum Empfangen einzelner Objekte Empfangen einer Liste und Übergeben an callbak Erstellen einer Kette ganzer Aufgaben Ausführen einer Kette Nachdem die Aufgabe ausgeführt wurde, veröffentlichen Sie sie_process_Aufgabenprozesse parallel
chains.apply_async()

Completed task

Wenn Sie "si ()" verwenden, wird es zu einer unveränderlichen Aufgabe, sodass Sie die Aufgabe ausführen können, indem Sie den Rückgabewert der vorherigen Aufgabe ignorieren.

Kobito.v0wo8Y.png

@task
def main_task(object_id):
    #Eine Aufgabe
    return (object_id, result)
    
@task
def sub_task_1(args):
    #Eine Aufgabe
    object_id, result = args
    return True
    
@task
def sub_task_2(args):
    #Eine Aufgabe
    object_id, result = args
    return True
    
@task
def finalize_task(object_id):
    #Abschlussprotokoll der Ausgabeaufgabe
    logger.info('Task completed')
    return True
    

object_id = 123

chain(
    main_task.s(object_id),
    group(
    	sub_task_1.s(),  # main_Verwenden Sie den Rückgabewert der Aufgabe
    	sub_task_2.s()   # main_Verwenden Sie den Rückgabewert der Aufgabe
    ),
    main_completed_task.si(object_id)       # s()Nicht si()Beachten Sie, dass
).apply_async()

Referenz

Recommended Posts

Sellerie 3.1 Workflow-Design
Sellerie
[Python] Webanwendungsdesign für maschinelles Lernen