如何通过任务名称检查和取消Celery任务

39 投票
5 回答
32498 浏览
提问于 2025-04-17 19:59

我正在使用Celery(3.0.15)和Redis作为消息中间件。

有没有简单的方法可以查询在Celery队列中,有多少个特定名称的任务?

另外,是否有办法取消在Celery队列中所有特定名称的任务?

我查看过监控和管理指南,但没有找到解决方案。

5 个回答

6

和往常一样,使用Celery的时候,这里没有一个答案对我有用,所以我照旧自己动手,搞了个直接查看redis的解决方案。下面就是我的做法……

# First, get a list of tasks from redis:
import redis, json

r = redis.Redis(
    host=settings.REDIS_HOST,
    port=settings.REDIS_PORT,
    db=settings.REDIS_DATABASES['CELERY'],
)
l = r.lrange('celery', 0, -1)

# Now import the task you want so you can get its name
from my_django.tasks import my_task

# Now, import your celery app and iterate over all tasks 
# from redis and nuke the ones that have a matching name.
from my_django.celery_init import app
for task in l:
     task_headers = json.loads(task)['headers']
     task_name = task_headers["task"]
     if task_name == my_task.name:
         task_id = task_headers['id']
         print("Terminating: %s" % task_id)
         app.control.revoke(task_id, terminate=True)

请注意,以这种方式撤销任务可能无法撤销已经预取的任务,所以你可能不会立即看到结果。

另外,这个方法不支持优先级任务。如果你想修改它来支持优先级任务,可以参考我在另一篇关于redis的回答中的一些建议。

22

有一个问题是之前的回答没有提到的,如果不注意可能会让人困惑。

在已经发布的解决方案中,我会选择Danielle的方案,不过我会做一个小修改:我会把任务导入到我的文件中,并使用它的.name属性来获取任务名称,以便传递给.tasks_by_type()

app.control.revoke(
    [uuid for uuid, _ in
     celery.events.state.State().tasks_by_type(task.name)])

但是,这个方案会忽略那些已经安排在未来执行的任务。就像一些人在其他回答中评论的那样,当我检查.tasks_by_type()返回的结果时,发现是一个空列表。实际上我的队列是空的。但我知道有一些任务是安排在未来执行的,而这些任务才是我主要关注的。我可以通过执行celery -A [app] inspect scheduled来查看这些任务,但上面的代码并没有影响到它们。

我通过以下方式成功撤销了这些已安排的任务:

app.control.revoke(
    [scheduled["request"]["id"] for scheduled in
     chain.from_iterable(app.control.inspect().scheduled()
                         .itervalues())])

app.control.inspect().scheduled()会返回一个字典,字典的键是工作者的名字,值是调度信息的列表(所以需要使用chain.from_iterable,这个是从itertools导入的)。任务信息在调度信息的"request"字段中,而"id"包含任务的ID。需要注意的是,即使在撤销之后,已安排的任务仍然会显示在已安排的任务列表中。被撤销的已安排任务不会从列表中移除,直到它们的计时器到期,或者Celery执行某些清理操作。(重启工作者会触发这样的清理。)

42

这段代码是用来处理一些数据的。它的主要功能是从一个地方获取数据,然后对这些数据进行一些操作,最后把结果输出到另一个地方。具体来说,它可能会读取文件、处理字符串或者进行数学计算。整个过程就像是一个流水线,数据从一端进来,经过一系列的处理,最后从另一端流出。

在编程中,我们经常会用到这样的代码块,它们帮助我们自动化一些重复的任务,让我们可以更高效地工作。

# Retrieve tasks
# Reference: http://docs.celeryproject.org/en/latest/reference/celery.events.state.html
query = celery.events.state.tasks_by_type(your_task_name)

# Kill tasks
# Reference: http://docs.celeryproject.org/en/latest/userguide/workers.html#revoking-tasks
for uuid, task in query:
    celery.control.revoke(uuid, terminate=True)

撰写回答