如何取消Celery中的冲突/旧任务?

0 投票
2 回答
2164 浏览
提问于 2025-04-29 14:08

我正在使用 Celery 和 RabbitMQ。
当一个 Celery 工作进程不可用时,所有的任务都会在 RabbitMQ 中等待。
一旦它上线,这些任务就会立刻被执行。
我能不能想办法防止这种情况发生呢?

比如说,有100个相同的任务在等着 Celery 工作进程,如果这个工作进程上线时,我能否只执行其中的一个任务呢?

暂无标签

2 个回答

1

有两种方法可以做到这一点。

第一种方法是只运行一个工作者,并且并发数设置为1。

celery worker -A your_app -l info -c 1

这个命令会启动一个工作者,并且只允许一个任务同时执行。这是推荐的做法。

第二种方法稍微复杂一些。你需要获取一个锁,然后释放这个锁,以确保只有一个任务在同一时间被执行

另外,如果你愿意的话,可以使用purge命令将队列中的所有任务移除。

celery -A your_app purge
2

因为你队列里的所有任务都是一样的,所以更好的做法是只发送一次任务。为了做到这一点,你需要能够跟踪任务是否已经发布,比如:

在任务发布时添加自定义状态:

from celery import current_app
from celery.signals import after_task_publish

@after_task_publish.connect
def add_sent_state(sender=None, body=None, **kwargs):
    """Track Published Tasks."""
    # get the task instance from its name
    task = current_app.tasks.get(sender)

    # if there is no task.backend fallback to app.backend
    backend = task.backend if task else current_app.backend

    # store the task state
    backend.store_result(body['id'], None, 'SENT')

当你想发送任务时,可以检查这个任务是否已经发布。因为我们使用了自定义状态,所以任务在发布时状态不会是PENDING(这可能是不确定的),因此我们可以通过以下方式进行检查:

from celery import states

# the task has a custom ID
task = task_func.AsyncResult('CUSTOM_ID')

if task.state != states.PENDING:
    # the task already exists
else:
    # send the task
    task_func.apply_async(args, kwargs, task_id='CUSTOM_ID')

我在我的应用中使用了这种方法,效果很好。我的任务可以被多次发送,但它们通过ID被识别,这样每个任务只会发送一次。

如果你仍然想取消队列中的所有任务,可以使用:

# import your Celery instance
from project.celery import app

app.control.purge()

查看Celery的常见问题解答 如何清除所有等待的任务?

撰写回答