芹菜任务树模块

celery-tasktree的Python项目详细描述


芹菜任务树是一个帮助执行芹菜任务树的模块。 以特定顺序异步地。当 任务和依赖项的数量增长,以及基于回调的简单方法 变得难以理解和维护。

使用示例

from celery_tasktree import task_with_callbacks, TaskTree

@task_with_callbacks
def some_action(...):
    ...

def execute_actions():
    tree = TaskTree()
    task0 = tree.add_task(some_action, args=[...], kwargs={...})
    task1 = tree.add_task(some_action, args=[...], kwargs={...})
    task10 = task1.add_task(some_action, args=[...], kwargs={...})
    task11 = task1.add_task(some_action, args=[...], kwargs={...})
    task110 = task11.add_task(some_action, args=[...], kwargs={...})
    async_result = tree.apply_async()
    return async_result

应该使用名为task_with_callbacks的装饰符,而不是简单的芹菜 task装饰器。

根据代码:

  • task0和task1同时执行
  • task10和task11在task1之后同时执行
  • task110在task11之后执行

注意事项:

  • 无法阻止执行的传播,也无法 从祖先向子任务传递额外的参数。简而言之,只有一个 任务之间的依赖关系:执行顺序的依赖关系。

  • 如果子任务(函数)返回值是对象,则名为 “异步结果”将被添加到该对象中,以便 使用join()收集有序的任务结果。扩展前面的示例:

    async_result = execute_actions()
    task0_result, task1_result = async_result.join()
    task10_result, task11_result = task1_result.async_result.join()
    task110_result = task11_result.async_result.join()
    

子类化带有回调的芹菜.task.task

@taskdecorator装饰函数是最简单的,但不是唯一的 创建新的Task子类的一种方法。有时更方便 将泛型celery.task.Task类划分为子类,并重新定义其run()方法。 为了使这样的类与tasktree兼容,run应该用 celery_tasktree.run_with_callbacks装饰器。下面的例子 说明了这种方法:

from celery.task import Task
from celery_tasktree import run_with_callbacks, TaskTree

class SomeActionTask(Task):

    @run_with_callbacks
    def run(self, ...):
        ...

def execute_actions():
    tree = TaskTree()
    task0 = tree.add_task(SomeActionTask, args=[...], kwargs={...})
    task01 = task0.add_task(SomeActionTask, args=[...], kwargs={...})
    tree.apply_async()

将tasktree用作简单队列

在很多情况下,一个完全成熟的任务树对你来说是过分的。你们所有人 需要将两个或多个任务添加到队列中,以确保它们将 按顺序执行。允许此任务树有push()pop() 方法实际上只是add_task()周围的包装器。 push()方法将新任务作为子任务添加到已创建的任务中。 而pop()从任务堆栈的尾部移除并返回任务。 用法示例如下:

# create the tree
tree = TaskTree()
# push a number of tasks into it
tree.push(action1, args=[...], kwargs={...})
tree.push(action2, args=[...], kwargs={...})
tree.push(actionX, args=[...], kwargs={...})
tree.pop() # get back action X from the queue
tree.push(action3, args=[...], kwargs={...})
# apply asynchronously
tree.apply_async()

操作将按action1 -> action2 -> action3顺序执行。

任务树外具有回调的任务

task_with_callbacksdecorator本身可能很有用。它装饰着 功能与普通的task芹菜装饰器相同,但是 添加可选的callback参数。

回调可以是子任务或子任务列表(而不是任务集)。背后 场景中,当调用带有回调的任务时,它执行函数的主代码, 然后构建一个任务集,异步调用它并附加 TaskSetResut作为函数返回名为async_result的属性 价值。

下面提供了一个简单的示例:

from celery_tasktree import task_with_callbacks

@task_with_callbacks
def some_action(...):
    ...

cb1 = some_action.subtask(...)
cb2 = some_action.subtask(...)
async_result = some_action.delay(..., callback=[cb1, cb2])
main_result = async_result.wait()
cb1_result, cb2_result = main_result.async_result.join()

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
java Oracle将休眠为ISO 8601日期格式   当有线程时,swing计时器不会停止。睡在Java里面   如何使用swing在java中清空密码字段值(字符串)   如何在编辑文本字段上设置单词java(安卓)   单独类中的java OkHttp请求   java Tomcat配置文件/上下文xml似乎已经崩溃了。请确保它是可分析和有效的。有关详细信息,请参阅服务器日志   java在科尔多瓦的ActivityResult上传递   java如何在映射中保持插入顺序。工厂?   “DataOutputStream”和“ObjectOutputStream”之间的java差异   java从FTP文件列表中获取项目的时间戳   java如何在spring security中为每个人忽略一些资源/URL?   模板类嵌套时新的Java泛型类构造函数问题   java读取并查找文件大小为1GB的行   java如何使用字符串say“stop”停止整数格式的while循环   java是否可以在应用程序启动之间将JVM保留在内存中?   java Springboot出现“出现意外错误(类型=内部服务器错误,状态=500)”的问题