在Django数据库中执行前撤销Celery任务
1 个回答
4
因为Celery是通过一个ID来跟踪任务的,所以你只需要知道哪些ID已经被取消了。与其去修改kombu
的内部结构,不如自己创建一个表(或者用memcached
等工具)来专门记录被取消的ID,然后检查当前可取消任务的ID是否在这个表里。
支持远程revoke
命令的传输方式内部就是这么做的:
所有的工作节点都会记住被撤销的任务ID,这些ID可以保存在内存中,也可以保存在硬盘上(参见持久化撤销)。(来自Celery文档)
当你使用django传输时,你需要自己负责这部分。在这种情况下,每个任务都需要检查自己是否被取消。
所以你任务的基本形式(这里用日志代替实际操作)变成了:
from celery import shared_task
from celery.exceptions import Ignore
from celery.utils.log import get_task_logger
from .models import task_canceled
logger = get_task_logger(__name__)
@shared_task
def my_task():
if task_canceled(my_task.request.id):
raise Ignore
logger.info("Doing my stuff")
你可以通过创建一个基础的CancelableTask类来扩展和改进这个功能,就像你链接的其他答案中提到的那样,但这就是基本的形式。现在你缺少的是模型和检查它的函数。
注意,这里的ID是一个字符串ID,比如a5644f08-7d30-43ff-a61e-81c165ad9e19
,而不是一个整数。你的模型可以简单到如下:
from django.db import models
class CanceledTask(models.Model):
task_id = models.CharField(max_length=200)
def cancel_task(request_id):
CanceledTask.objects.create(task_id=request_id)
def task_canceled(request_id):
return CanceledTask.objects.filter(task_id=request_id).exists()
现在你可以通过观察celery服务的调试日志来检查行为,同时进行一些操作,比如:
my_task.delay()
models.cancel_task(my_task.delay())