[PYTHON] Workflow design for Celery 3.1

What's happening while reading the Celery docs Official docs and chewing yourself.

The primitives

Map & Starmap, Chunks have not been examined.

Chains

Execute tasks in series. Subsequent tasks receive the execution result of the previous task.

** Pay attention to the signature of each task. ** **

from celery import chain

# `add.s(4, 4)`The result of`mul.s(8)`Cross over to.そThe result of`mul.s(10)`Cross over to.
chain(add.s(4, 4), mul.s(8), mul.s(10))

Groups

Execute multiple tasks in parallel.

from celery import group

# `task.s(2, 2)`When`task.s(4, 4)`Is executed in parallel
group(
	task.s(2, 2),
	task.s(4, 4)
)

Chords

Execution results of multiple tasks can be passed to the callback.

from celery import chord

#Tsum multiple execution results.s()Pass to
chord(add.s(i, i) for i in xrange(100))(tsum.s()).get()

In this example, ʻadd.s (i, i) for i in xrange (100)is a task to be executed in parallel, and the execution result (list?) Is passed to the callbacktsum.s ()`.

combination

1 pre-processor + n post-processors

I wanted to do this and looked it up!

It looks like this as a time scale.

kobito.1423119852.301252.png

Assuming tasks such as image saving as pre-processing, image processing as post-processing, resizing, etc. that can be run at the same time.

@task
def main_task(object_id):
    #Pre-processing tasks
    
    #Nice processing
    
    #Object for subsequent tasks_Returns id
    return object_id
    
@task
def sub_task_1(object_id):
    #Post-processing tasks
    pass

@task
def sub_task_2(object_id):
    #Post-processing tasks
    pass


#Build a chain of entire tasks. chain()And group.
chains = chain(
    main_task.s(object.pk),
    group(
    	sub_task_1.s(),
    	sub_task_2.s()
    )
)

#Run the chain
# main_After the execution of task is completed, sub_task_1, sub_task_2 is executed in parallel.
chains.apply_async()

The point is to match the signatures received by subsequent tasks in the group.

1 processor + mapped post processors

Perform multiple outputs from the pre-processing and run multiple subsequent tasks in parallel. Similar to the previous flow, except that the pre process outputs multiple results (indefinite number).

kobito.1423120577.848210.png

@task
def pre_process_task(object_id):
    #Pre-processing tasks

    #Nice processing

    #Returns a list of objects that will be the processing result
	return [1, 2, 3, 4, 5 ...]

@task
def post_process_task(object_id):
    #Post-processing tasks
    #Design to receive individual objects
    pass


@task
def dmap(it, callback):
    #Receive the list and pass it to callbak
    callback = subtask(callback)
    return group(callback.clone([arg,]) for arg in it)()


#Build a chain of entire tasks
chains = chain(
    pre_process_task.s(object.pk),
    dmap.s(post_process_task.s()
)

#Run the chain
# pre_process_post after task execution is complete_process_task processes in parallel
chains.apply_async()

Completed task

If you use si (), it becomes an immutable task, so you can execute the task by ignoring the return value of the previous task.

Kobito.v0wo8Y.png

@task
def main_task(object_id):
    #Some task
    return (object_id, result)
    
@task
def sub_task_1(args):
    #Some task
    object_id, result = args
    return True
    
@task
def sub_task_2(args):
    #Some task
    object_id, result = args
    return True
    
@task
def finalize_task(object_id):
    #Output task completion log
    logger.info('Task completed')
    return True
    

object_id = 123

chain(
    main_task.s(object_id),
    group(
    	sub_task_1.s(),  # main_Use the return value of task
    	sub_task_2.s()   # main_Use the return value of task
    ),
    main_completed_task.si(object_id)       # s()Not si()Note that
).apply_async()

reference

Recommended Posts

Workflow design for Celery 3.1
celery
[Python] Web application design for machine learning