Celery任务调用另一个任务

1 投票
1 回答
890 浏览
提问于 2025-04-18 15:42

我有两个文件。一个是我程序的主文件,里面包含了所有需要执行的celery任务:

chord(
    tasks.task_01.subtask(task_id='task_01'),
    tasks.task_02.subtask(task_id='task_02')
).delay()

然后我还有一个task.py文件:

@task(bind=True)
def task_01(self, result=None):

    headers = models.Header.objects.all()
    group(extract_emails.subtask((header,)) for header in headers).delay()

最后是extract_emails这个任务:

@task(bind=True)
def extract_emails(header, result=None):

    print header.id  #to check in celery log if the header item is recieved
    url_parser.find_emails(header)

我的目标是执行task_01,让它并行运行一组'extract_emails'任务,并把'header'作为参数传进去。我希望'extract_emails'任务能接收到这个header,并用它运行一些简单的代码。

但是当我尝试这样做时,我遇到了一个错误:AttributeError("'extract_emails'对象没有'id'这个属性",)

这个错误是怎么回事呢?我甚至没有把任务的名字作为参数传递啊!我的代码哪里出问题了?

1 个回答

0

在我的例子中,'extract_emails' 这个任务应该这样写:

@task(bind=True)
def extract_emails(self, result=None):

    header = self.request.args[1] 
    url_parser.find_emails(header)

只需要用 self.request.PARAM 来提取你需要的内容。 更多信息可以在 官方文档 中找到。

撰写回答