测试celery任务是否仍在处理中

11 投票
2 回答
5103 浏览
提问于 2025-04-16 07:56

我该如何测试一个任务(task_id)在celery中是否还在处理呢?我有以下的情况:

  1. 在Django的视图中启动一个任务
  2. 把BaseAsyncResult存储在会话中
  3. 强制关闭celery的后台进程,这样任务就不再被处理了
  4. 检查这个任务是否“死掉”了

有什么想法吗?我能否查看所有正在被celery处理的任务,看看我的任务是否还在?

2 个回答

0

我觉得有比把任务对象存储在模型里更好的方法。例如,如果你想检查一组任务(并行执行的)是否完成:

# in the script you launch the task
from celery import group

job = group(
    task1.s(param1, param2),
    task2.s(param3, param4)
)
result = job.apply_async()
result.save()

# save the result ID in your model
your_obj_model = YourModel.objects.get(id='1234')
your_obj_model.task_id = result.id
your_obj_model.save()

那么在你的视图中

from celery.result import GroupResult
# ...
task_result = GroupResult.restore(your_obj_model.task_id)
task_finished = task_result.ready()
# will be True or False
3

在你的模型里定义一个字段(PickledObjectField),用来存储celery任务:

class YourModel(models.Model):
    .
    .
    celery_task = PickledObjectField()
    .
    .

    def task():
        self.celery_task = SubmitTask.apply_async(args = self.task_detail())
        self.save()

如果你的任务不特定于某个模型,那你应该专门创建一个模型来处理celery任务。

否则,我建议你使用django-celery。它有一个很不错的监控功能:
http://ask.github.com/celery/userguide/monitoring.html#django-admin-monitor,可以把任务的详细信息以图形化的方式保存在django模型里。

撰写回答