在Django数据库中执行前撤销Celery任务

1 投票
1 回答
1292 浏览
提问于 2025-04-18 15:24

我在使用Django数据库,而不是RabbitMQ,主要是为了处理并发问题。

但是我遇到了一个问题,就是在任务执行之前如何撤销这个任务。

我找到了一些关于这个问题的答案,但感觉不够完整,或者我没有得到足够的帮助。

我该如何使用模型扩展celery任务表,添加一个布尔字段(revoked),用来标记我不想执行的任务呢?

谢谢。

1 个回答

4

因为Celery是通过一个ID来跟踪任务的,所以你只需要知道哪些ID已经被取消了。与其去修改kombu的内部结构,不如自己创建一个表(或者用memcached等工具)来专门记录被取消的ID,然后检查当前可取消任务的ID是否在这个表里。

支持远程revoke命令的传输方式内部就是这么做的:

所有的工作节点都会记住被撤销的任务ID,这些ID可以保存在内存中,也可以保存在硬盘上(参见持久化撤销)。(来自Celery文档)

当你使用django传输时,你需要自己负责这部分。在这种情况下,每个任务都需要检查自己是否被取消。

所以你任务的基本形式(这里用日志代替实际操作)变成了:

from celery import shared_task
from celery.exceptions import Ignore
from celery.utils.log import get_task_logger
from .models import task_canceled
logger = get_task_logger(__name__)

@shared_task
def my_task():
    if task_canceled(my_task.request.id):
        raise Ignore
    logger.info("Doing my stuff")

你可以通过创建一个基础的CancelableTask类来扩展和改进这个功能,就像你链接的其他答案中提到的那样,但这就是基本的形式。现在你缺少的是模型和检查它的函数。

注意,这里的ID是一个字符串ID,比如a5644f08-7d30-43ff-a61e-81c165ad9e19而不是一个整数。你的模型可以简单到如下:

from django.db import models

class CanceledTask(models.Model):
    task_id = models.CharField(max_length=200)

def cancel_task(request_id):
    CanceledTask.objects.create(task_id=request_id)

def task_canceled(request_id):
    return CanceledTask.objects.filter(task_id=request_id).exists()

现在你可以通过观察celery服务的调试日志来检查行为,同时进行一些操作,比如:

my_task.delay()
models.cancel_task(my_task.delay())

撰写回答