从chain-tas生成组任务的芹菜

2024-04-25 14:56:16 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在尝试用芹菜(v4.0)链接以下任务

task = group([tasks1.s(), task2.s()) | generate_job_requests.s() | execute_job.map() | aggregate_result.s()
result = task.get()

上面的部分工作得很好,generate_job_requests作为和弦。 但问题是从execute_job开始的,它从generate_job_requests获取作业列表,为此我需要创建并行任务,然后再创建所有作业的聚合结果。在

我想验证一下芹菜是否可以使用这种任务图?有没有其他可行的工作流程来解决这种依赖性的问题? 我在文档中丢失了任何东西。在


Tags: maptaskexecuteget链接作业groupjob
1条回答
网友
1楼 · 发布于 2024-04-25 14:56:16

我在中间任务创建者中使用了类似地图的功能

@shared_task(ignore_result=False)
def dmap(it, callback, end_task):
    callback = subtask(callback)
    grp = group(callback.clone([arg, ]) for arg in it)
    c = (grp | end_task)
    return c()

所以任务流程就这样减少了

^{pr2}$

为了得到任务的最终输出,我做了一些调整

# As we get dmap task id here
dmap_task = celery_app.AsyncResult(task.id)
dmap_result = dmap_task.get()
# Get actual aggregate_result task id
aggr_res_task_id = dmap_result[0][0]
result = celery_app.AsyncResult(aggr_res_task_id)
# Here we receive actual output of overall task
result.get()

我提到了answer

相关问题 更多 >