芹菜任务树模块

celery-tasktree的Python项目详细描述


芹菜任务树是一个帮助执行芹菜任务树的模块。 以特定顺序异步地。当 任务和依赖项的数量增长,以及基于回调的简单方法 变得难以理解和维护。

使用示例

from celery_tasktree import task_with_callbacks, TaskTree

@task_with_callbacks
def some_action(...):
    ...

def execute_actions():
    tree = TaskTree()
    task0 = tree.add_task(some_action, args=[...], kwargs={...})
    task1 = tree.add_task(some_action, args=[...], kwargs={...})
    task10 = task1.add_task(some_action, args=[...], kwargs={...})
    task11 = task1.add_task(some_action, args=[...], kwargs={...})
    task110 = task11.add_task(some_action, args=[...], kwargs={...})
    async_result = tree.apply_async()
    return async_result

应该使用名为task_with_callbacks的装饰符,而不是简单的芹菜 task装饰器。

根据代码:

  • task0和task1同时执行
  • task10和task11在task1之后同时执行
  • task110在task11之后执行

注意事项:

  • 无法阻止执行的传播,也无法 从祖先向子任务传递额外的参数。简而言之,只有一个 任务之间的依赖关系:执行顺序的依赖关系。

  • 如果子任务(函数)返回值是对象,则名为 “异步结果”将被添加到该对象中,以便 使用join()收集有序的任务结果。扩展前面的示例:

    async_result = execute_actions()
    task0_result, task1_result = async_result.join()
    task10_result, task11_result = task1_result.async_result.join()
    task110_result = task11_result.async_result.join()
    

子类化带有回调的芹菜.task.task

@taskdecorator装饰函数是最简单的,但不是唯一的 创建新的Task子类的一种方法。有时更方便 将泛型celery.task.Task类划分为子类,并重新定义其run()方法。 为了使这样的类与tasktree兼容,run应该用 celery_tasktree.run_with_callbacks装饰器。下面的例子 说明了这种方法:

from celery.task import Task
from celery_tasktree import run_with_callbacks, TaskTree

class SomeActionTask(Task):

    @run_with_callbacks
    def run(self, ...):
        ...

def execute_actions():
    tree = TaskTree()
    task0 = tree.add_task(SomeActionTask, args=[...], kwargs={...})
    task01 = task0.add_task(SomeActionTask, args=[...], kwargs={...})
    tree.apply_async()

将tasktree用作简单队列

在很多情况下,一个完全成熟的任务树对你来说是过分的。你们所有人 需要将两个或多个任务添加到队列中,以确保它们将 按顺序执行。允许此任务树有push()pop() 方法实际上只是add_task()周围的包装器。 push()方法将新任务作为子任务添加到已创建的任务中。 而pop()从任务堆栈的尾部移除并返回任务。 用法示例如下:

# create the tree
tree = TaskTree()
# push a number of tasks into it
tree.push(action1, args=[...], kwargs={...})
tree.push(action2, args=[...], kwargs={...})
tree.push(actionX, args=[...], kwargs={...})
tree.pop() # get back action X from the queue
tree.push(action3, args=[...], kwargs={...})
# apply asynchronously
tree.apply_async()

操作将按action1 -> action2 -> action3顺序执行。

任务树外具有回调的任务

task_with_callbacksdecorator本身可能很有用。它装饰着 功能与普通的task芹菜装饰器相同,但是 添加可选的callback参数。

回调可以是子任务或子任务列表(而不是任务集)。背后 场景中,当调用带有回调的任务时,它执行函数的主代码, 然后构建一个任务集,异步调用它并附加 TaskSetResut作为函数返回名为async_result的属性 价值。

下面提供了一个简单的示例:

from celery_tasktree import task_with_callbacks

@task_with_callbacks
def some_action(...):
    ...

cb1 = some_action.subtask(...)
cb2 = some_action.subtask(...)
async_result = some_action.delay(..., callback=[cb1, cb2])
main_result = async_result.wait()
cb1_result, cb2_result = main_result.async_result.join()

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
java JavaFX TableView更新单元格,不更新对象值   在扫描器中使用分隔符的java   java OkHttp 4.9.2,连接无法重用,导致端口耗尽   eclipse中的c JNI:运行Java代码   java是否在出厂的所有硬件设备中都有/mnt/sdcard/Android/data文件夹(或等效文件夹)?   Java,在eclipse中访问资源文件夹中的图像   java为什么Bluemix dashDB操作抛出SqlSyntaxErrorException,SQLCODE=1667?   JavaHtmlUnitWebClient。getPage不处理javascript   Google API认证的java问题   java如何将JSON数组反序列化为Apache beam PCollection<javaObject>   ServerSocket停止接收命令,java/安卓   来自Java类的安卓 Toast消息   java如何自动重新加载应用程序引擎开发服务器?   java是否可以尝试/捕获一些东西来检查是否抛出了异常?   java如何做到这一点当我按下load game时,它不仅会加载信息,还会将您带到游戏中?   Java选项Xmx代表什么?   Java映射,它在插入时打印值   设置“ulimit c unlimited”后,java无法生成系统核心转储