如何为每个Django模型实例调度周期性Celery任务?
我在数据库里有一堆 Feed 对象,我想让每个 Feed 每小时更新一次。我的问题是,我需要确保不会有重复的更新——也就是说,更新不能超过每小时一次,但我也不想让 Feed 等待两个小时才更新。(每小时更新一次,前后几分钟都可以,但如果几分钟内更新两次就不行。)
我正在使用 Django 和 Celery,并且用 Amazon SQS 作为消息中间件。我已经把 Feed 更新的代码设置成了一个 Celery 任务,但我找不到一个方法来防止重复更新,同时又能兼容在多个节点上运行的 Celery。
我现在的解决方案是给 Feed 模型添加一个 last_update_scheduled
属性,并每 5 分钟运行一次以下任务(伪代码):
threshold = datetime.now() - timedelta(seconds=3600)
for f in Feed.objects.filter(Q(last_update_scheduled__lt = threshold) |
Q(last_update_scheduled = None)):
updateFeed.delay(f)
f.last_update_scheduled = now
f.save()
这个方法容易出现一些同步问题。例如,如果我的任务队列积压,这个任务可能会同时运行两次,导致重复更新。我见过一些解决方案(比如 Celery 的食谱 和 Stack Overflow 上的一个改编),但 memcached 的解决方案并不可靠,比如在重启 memcached 或者它内存不足清理旧数据时,可能会出现重复更新。更不用说我不想为了一个简单的锁就把 memcached 加到我的生产环境配置里。
在理想的情况下,我希望能够这样做:
@modelTask(Feed, run_every=3600)
def updateFeed(feed):
# do something expensive
但到目前为止,我还没想到怎么实现这个装饰器。
1 个回答
0
为了让大家明白,Celery这个配方其实并不是直接在用memcached,而是用的是Django的缓存中间件。其实还有很多其他的缓存方法可以满足你的需求,而且没有memcached的一些缺点。如果你想了解更多,可以查看Django的缓存文档。