如何通过任务名称检查和取消Celery任务
我正在使用Celery(3.0.15)和Redis作为消息中间件。
有没有简单的方法可以查询在Celery队列中,有多少个特定名称的任务?
另外,是否有办法取消在Celery队列中所有特定名称的任务?
我查看过监控和管理指南,但没有找到解决方案。
5 个回答
和往常一样,使用Celery的时候,这里没有一个答案对我有用,所以我照旧自己动手,搞了个直接查看redis的解决方案。下面就是我的做法……
# First, get a list of tasks from redis:
import redis, json
r = redis.Redis(
host=settings.REDIS_HOST,
port=settings.REDIS_PORT,
db=settings.REDIS_DATABASES['CELERY'],
)
l = r.lrange('celery', 0, -1)
# Now import the task you want so you can get its name
from my_django.tasks import my_task
# Now, import your celery app and iterate over all tasks
# from redis and nuke the ones that have a matching name.
from my_django.celery_init import app
for task in l:
task_headers = json.loads(task)['headers']
task_name = task_headers["task"]
if task_name == my_task.name:
task_id = task_headers['id']
print("Terminating: %s" % task_id)
app.control.revoke(task_id, terminate=True)
请注意,以这种方式撤销任务可能无法撤销已经预取的任务,所以你可能不会立即看到结果。
另外,这个方法不支持优先级任务。如果你想修改它来支持优先级任务,可以参考我在另一篇关于redis的回答中的一些建议。
有一个问题是之前的回答没有提到的,如果不注意可能会让人困惑。
在已经发布的解决方案中,我会选择Danielle的方案,不过我会做一个小修改:我会把任务导入到我的文件中,并使用它的.name
属性来获取任务名称,以便传递给.tasks_by_type()
。
app.control.revoke(
[uuid for uuid, _ in
celery.events.state.State().tasks_by_type(task.name)])
但是,这个方案会忽略那些已经安排在未来执行的任务。就像一些人在其他回答中评论的那样,当我检查.tasks_by_type()
返回的结果时,发现是一个空列表。实际上我的队列是空的。但我知道有一些任务是安排在未来执行的,而这些任务才是我主要关注的。我可以通过执行celery -A [app] inspect scheduled
来查看这些任务,但上面的代码并没有影响到它们。
我通过以下方式成功撤销了这些已安排的任务:
app.control.revoke(
[scheduled["request"]["id"] for scheduled in
chain.from_iterable(app.control.inspect().scheduled()
.itervalues())])
app.control.inspect().scheduled()
会返回一个字典,字典的键是工作者的名字,值是调度信息的列表(所以需要使用chain.from_iterable
,这个是从itertools
导入的)。任务信息在调度信息的"request"
字段中,而"id"
包含任务的ID。需要注意的是,即使在撤销之后,已安排的任务仍然会显示在已安排的任务列表中。被撤销的已安排任务不会从列表中移除,直到它们的计时器到期,或者Celery执行某些清理操作。(重启工作者会触发这样的清理。)
这段代码是用来处理一些数据的。它的主要功能是从一个地方获取数据,然后对这些数据进行一些操作,最后把结果输出到另一个地方。具体来说,它可能会读取文件、处理字符串或者进行数学计算。整个过程就像是一个流水线,数据从一端进来,经过一系列的处理,最后从另一端流出。
在编程中,我们经常会用到这样的代码块,它们帮助我们自动化一些重复的任务,让我们可以更高效地工作。
# Retrieve tasks
# Reference: http://docs.celeryproject.org/en/latest/reference/celery.events.state.html
query = celery.events.state.tasks_by_type(your_task_name)
# Kill tasks
# Reference: http://docs.celeryproject.org/en/latest/userguide/workers.html#revoking-tasks
for uuid, task in query:
celery.control.revoke(uuid, terminate=True)