如何取消Celery中的冲突/旧任务?
我正在使用 Celery 和 RabbitMQ。
当一个 Celery 工作进程不可用时,所有的任务都会在 RabbitMQ 中等待。
一旦它上线,这些任务就会立刻被执行。
我能不能想办法防止这种情况发生呢?
比如说,有100个相同的任务在等着 Celery 工作进程,如果这个工作进程上线时,我能否只执行其中的一个任务呢?
2 个回答
1
有两种方法可以做到这一点。
第一种方法是只运行一个工作者,并且并发数设置为1。
celery worker -A your_app -l info -c 1
这个命令会启动一个工作者,并且只允许一个任务同时执行。这是推荐的做法。
第二种方法稍微复杂一些。你需要获取一个锁,然后释放这个锁,以确保只有一个任务在同一时间被执行。
另外,如果你愿意的话,可以使用purge
命令将队列中的所有任务移除。
celery -A your_app purge
2
因为你队列里的所有任务都是一样的,所以更好的做法是只发送一次任务。为了做到这一点,你需要能够跟踪任务是否已经发布,比如:
使用锁,例如:确保任务一次只执行一次
使用自定义任务ID和自定义状态,在任务发布后,例如:
在任务发布时添加自定义状态:
from celery import current_app
from celery.signals import after_task_publish
@after_task_publish.connect
def add_sent_state(sender=None, body=None, **kwargs):
"""Track Published Tasks."""
# get the task instance from its name
task = current_app.tasks.get(sender)
# if there is no task.backend fallback to app.backend
backend = task.backend if task else current_app.backend
# store the task state
backend.store_result(body['id'], None, 'SENT')
当你想发送任务时,可以检查这个任务是否已经发布。因为我们使用了自定义状态,所以任务在发布时状态不会是PENDING
(这可能是不确定的),因此我们可以通过以下方式进行检查:
from celery import states
# the task has a custom ID
task = task_func.AsyncResult('CUSTOM_ID')
if task.state != states.PENDING:
# the task already exists
else:
# send the task
task_func.apply_async(args, kwargs, task_id='CUSTOM_ID')
我在我的应用中使用了这种方法,效果很好。我的任务可以被多次发送,但它们通过ID被识别,这样每个任务只会发送一次。
如果你仍然想取消队列中的所有任务,可以使用:
# import your Celery instance
from project.celery import app
app.control.purge()
查看Celery的常见问题解答 如何清除所有等待的任务?