Celery,带并发的定时任务执行

0 投票
1 回答
1076 浏览
提问于 2025-04-18 12:22

我想每秒钟启动一个定时任务,但前提是上一个任务已经结束(这是在数据库中轮询,以便将任务发送到celery)。在Celery的文档中,他们使用Django的缓存来创建一个锁。

我尝试使用了这个例子:

from __future__ import absolute_import

import datetime
import time

from celery import shared_task

from django.core.cache import cache
LOCK_EXPIRE = 60 * 5

@shared_task
def periodic():

    acquire_lock = lambda: cache.add('lock_id', 'true', LOCK_EXPIRE)
    release_lock = lambda: cache.delete('lock_id')

    a = acquire_lock()
    if a:
        try:
            time.sleep(10)
            print a, 'Hello ', datetime.datetime.now()
        finally:
            release_lock()
    else:
        print 'Ignore'

并且使用了以下配置:

app.conf.update(
    CELERY_IGNORE_RESULT=True,
    CELERY_ACCEPT_CONTENT=['json'],
    CELERY_TASK_SERIALIZER='json',
    CELERY_RESULT_SERIALIZER='json',
    CELERYBEAT_SCHEDULE={
        'periodic_task': {
            'task': 'app_task_management.tasks.periodic',
            'schedule': timedelta(seconds=1),
        },
    },
)

但是在控制台中,我从来没有看到Ignore的消息,而是每秒都能看到Hello。看起来这个锁没有正常工作。

我用以下方式启动定时任务:

celeryd -B -A my_app

并且用以下方式启动工作进程:

celery worker -A my_app -l info

你能帮我纠正一下我的误解吗?

1 个回答

2

来自Django缓存框架文档关于本地内存缓存的内容:

请注意,每个进程都有自己的私有缓存实例,这意味着无法进行跨进程缓存。

简单来说,你的工作进程各自管理自己的缓存。如果你需要一个资源消耗低的缓存方式,我建议使用文件缓存或数据库缓存,这两种方式都可以实现跨进程共享。

撰写回答