Was passiert beim Lesen des Sellerie-Dokuments? Kauen Sie sich beim Lesen des offiziellen Dokuments.
The primitives
Map & Starmap, Chunks wurden nicht untersucht.
Chains
Führen Sie Aufgaben in Serie aus. Nachfolgende Aufgaben erhalten das Ausführungsergebnis der vorherigen Aufgabe.
** Achten Sie auf die Unterschrift jeder Aufgabe. ** ** **
from celery import chain
# `add.s(4, 4)`Das Ergebnis von`mul.s(8)`Überqueren Sie zu.そDas Ergebnis von`mul.s(10)`Überqueren Sie zu.
chain(add.s(4, 4), mul.s(8), mul.s(10))
Groups
Führen Sie mehrere Aufgaben gleichzeitig aus.
from celery import group
# `task.s(2, 2)`Wann`task.s(4, 4)`Wird parallel ausgeführt
group(
task.s(2, 2),
task.s(4, 4)
)
Chords
Die Ausführungsergebnisse mehrerer Aufgaben können an den Rückruf übergeben werden.
from celery import chord
#Tsum mehrere Ausführungsergebnisse.s()Übergeben an
chord(add.s(i, i) for i in xrange(100))(tsum.s()).get()
In diesem Beispiel ist "add.s (i, i) für i in xrange (100)" eine Aufgabe, die parallel ausgeführt werden soll, und das Ausführungsergebnis (Liste?) Wird an den Rückruf "tsum.s ()" übergeben.
1 pre-processor + n post-processors
Ich wollte das machen und habe es nachgeschlagen!
Es sieht so aus als Zeitskala.
Übernehmen Sie Aufgaben wie das gleichzeitige Ausführen des Speicherns von Bildern als Vorverarbeitung, das Verarbeiten von Bildern als Nachbearbeitung, das Ändern der Größe usw.
@task
def main_task(object_id):
#Vorverarbeitungsaufgaben
#Gute Verarbeitung
#Objekt für nachfolgende Aufgaben_Gibt die ID zurück
return object_id
@task
def sub_task_1(object_id):
#Nachbearbeitungsaufgaben
pass
@task
def sub_task_2(object_id):
#Nachbearbeitungsaufgaben
pass
#Bauen Sie eine Kette ganzer Aufgaben auf. Kette()Und Gruppe.
chains = chain(
main_task.s(object.pk),
group(
sub_task_1.s(),
sub_task_2.s()
)
)
#Führen Sie die Kette aus
# main_Nachdem die Ausführung der Aufgabe abgeschlossen ist, wird sub_task_1, sub_task_2 wird parallel ausgeführt.
chains.apply_async()
Es geht darum, die Signaturen abzugleichen, die von nachfolgenden Aufgaben in der Gruppe empfangen wurden.
1 processor + mapped post processors
Führen Sie mehrere Ausgaben der Vorverarbeitung durch und führen Sie mehrere nachfolgende Aufgaben parallel aus. Ähnlich wie beim vorherigen Ablauf, außer dass der Vorprozess mehrere Ergebnisse ausgibt (unbestimmte Anzahl).
@task
def pre_process_task(object_id):
#Vorverarbeitungsaufgaben
#Gute Verarbeitung
#
return [1, 2, 3, 4, 5 ...]
@task
def post_process_task(object_id):
#
#
pass
@task
def dmap(it, callback):
#
callback = subtask(callback)
return group(callback.clone([arg,]) for arg in it)()
#
chains = chain(
pre_process_task.s(object.pk),
dmap.s(post_process_task.s()
)
#
# pre_process_Nachbearbeitungsaufgaben Entwerfen zum Empfangen einzelner Objekte Empfangen einer Liste und Übergeben an callbak Erstellen einer Kette ganzer Aufgaben Ausführen einer Kette Nachdem die Aufgabe ausgeführt wurde, veröffentlichen Sie sie_process_Aufgabenprozesse parallel
chains.apply_async()
Completed task
Wenn Sie "si ()" verwenden, wird es zu einer unveränderlichen Aufgabe, sodass Sie die Aufgabe ausführen können, indem Sie den Rückgabewert der vorherigen Aufgabe ignorieren.
@task
def main_task(object_id):
#Eine Aufgabe
return (object_id, result)
@task
def sub_task_1(args):
#Eine Aufgabe
object_id, result = args
return True
@task
def sub_task_2(args):
#Eine Aufgabe
object_id, result = args
return True
@task
def finalize_task(object_id):
#Abschlussprotokoll der Ausgabeaufgabe
logger.info('Task completed')
return True
object_id = 123
chain(
main_task.s(object_id),
group(
sub_task_1.s(), # main_Verwenden Sie den Rückgabewert der Aufgabe
sub_task_2.s() # main_Verwenden Sie den Rückgabewert der Aufgabe
),
main_completed_task.si(object_id) # s()Nicht si()Beachten Sie, dass
).apply_async()