芹菜任务树模块
celery-tasktree的Python项目详细描述
芹菜任务树是一个帮助执行芹菜任务树的模块。 以特定顺序异步地。当 任务和依赖项的数量增长,以及基于回调的简单方法 变得难以理解和维护。
使用示例
from celery_tasktree import task_with_callbacks, TaskTree @task_with_callbacks def some_action(...): ... def execute_actions(): tree = TaskTree() task0 = tree.add_task(some_action, args=[...], kwargs={...}) task1 = tree.add_task(some_action, args=[...], kwargs={...}) task10 = task1.add_task(some_action, args=[...], kwargs={...}) task11 = task1.add_task(some_action, args=[...], kwargs={...}) task110 = task11.add_task(some_action, args=[...], kwargs={...}) async_result = tree.apply_async() return async_result
应该使用名为task_with_callbacks的装饰符,而不是简单的芹菜 task装饰器。
根据代码:
- task0和task1同时执行
- task10和task11在task1之后同时执行
- task110在task11之后执行
注意事项:
无法阻止执行的传播,也无法 从祖先向子任务传递额外的参数。简而言之,只有一个 任务之间的依赖关系:执行顺序的依赖关系。
如果子任务(函数)返回值是对象,则名为 “异步结果”将被添加到该对象中,以便 使用join()收集有序的任务结果。扩展前面的示例:
async_result = execute_actions() task0_result, task1_result = async_result.join() task10_result, task11_result = task1_result.async_result.join() task110_result = task11_result.async_result.join()
子类化带有回调的芹菜.task.task
用@taskdecorator装饰函数是最简单的,但不是唯一的 创建新的Task子类的一种方法。有时更方便 将泛型celery.task.Task类划分为子类,并重新定义其run()方法。 为了使这样的类与tasktree兼容,run应该用 celery_tasktree.run_with_callbacks装饰器。下面的例子 说明了这种方法:
from celery.task import Task from celery_tasktree import run_with_callbacks, TaskTree class SomeActionTask(Task): @run_with_callbacks def run(self, ...): ... def execute_actions(): tree = TaskTree() task0 = tree.add_task(SomeActionTask, args=[...], kwargs={...}) task01 = task0.add_task(SomeActionTask, args=[...], kwargs={...}) tree.apply_async()
将tasktree用作简单队列
在很多情况下,一个完全成熟的任务树对你来说是过分的。你们所有人 需要将两个或多个任务添加到队列中,以确保它们将 按顺序执行。允许此任务树有push()和pop() 方法实际上只是add_task()周围的包装器。 push()方法将新任务作为子任务添加到已创建的任务中。 而pop()从任务堆栈的尾部移除并返回任务。 用法示例如下:
# create the tree tree = TaskTree() # push a number of tasks into it tree.push(action1, args=[...], kwargs={...}) tree.push(action2, args=[...], kwargs={...}) tree.push(actionX, args=[...], kwargs={...}) tree.pop() # get back action X from the queue tree.push(action3, args=[...], kwargs={...}) # apply asynchronously tree.apply_async()
操作将按action1 -> action2 -> action3顺序执行。
任务树外具有回调的任务
task_with_callbacksdecorator本身可能很有用。它装饰着 功能与普通的task芹菜装饰器相同,但是 添加可选的callback参数。
回调可以是子任务或子任务列表(而不是任务集)。背后 场景中,当调用带有回调的任务时,它执行函数的主代码, 然后构建一个任务集,异步调用它并附加 TaskSetResut作为函数返回名为async_result的属性 价值。
下面提供了一个简单的示例:
from celery_tasktree import task_with_callbacks @task_with_callbacks def some_action(...): ... cb1 = some_action.subtask(...) cb2 = some_action.subtask(...) async_result = some_action.delay(..., callback=[cb1, cb2]) main_result = async_result.wait() cb1_result, cb2_result = main_result.async_result.join()