芹菜任务时间表(确保一个任务一次只能执行一个)

2024-04-25 22:57:42 发布

您现在位置:Python中文网/ 问答频道 /正文

我有个任务,有点像这样:

@task()
def async_work(info):
    ...

任何时候,我都可以用一些信息调用async_work。出于某种原因,我需要确保一次只运行一个异步工作,其他调用请求必须等待。

所以我提出了以下代码:

is_locked = False    
@task()
    def async_work(info):
        while is_locked:
            pass
        is_locked = True
        ...
        is_locked = False

但是它说访问局部变量是无效的。。。 怎么解决?


Tags: 代码info信息falsetruetaskasyncis
3条回答

您可能不想为芹菜工人使用concurrency=1-您希望同时处理您的任务。相反,你可以使用某种锁定机制。只需确保缓存的超时时间大于完成任务的时间。

Redis公司

import redis
from contextlib import contextmanager

redis_client = redis.Redis(host='localhost', port=6378)


@contextmanager
def redis_lock(lock_name):
    """Yield 1 if specified lock_name is not already set in redis. Otherwise returns 0.

    Enables sort of lock functionality.
    """
    status = redis_client.set(lock_name, 'lock', nx=True)
    try:
        yield status
    finally:
        redis_client.delete(lock_name)


@task()
def async_work(info):
    with redis_lock('my_lock_name') as acquired:
        do_some_work()

内存缓存

celery documentation启发的示例

from contextlib import contextmanager
from django.core.cache import cache

@contextmanager
def memcache_lock(lock_name):
    status = cache.add(lock_name, 'lock')
    try:
        yield status
    finally:
        cache.delete(lock_name)


@task()
def async_work(info):
    with memcache_lock('my_lock_name') as acquired:
        do_some_work() 

访问局部变量是无效的,因为您可以让几个芹菜工人运行任务。这些工人甚至可能在不同的主机上。因此,基本上,变量实例的数量与芹菜工人运行的数量一样多 你的任务。因此,即使你的代码不会引发任何错误,你也不会得到期望的效果。

要实现您的目标,您需要将芹菜配置为只运行一个工人。因为任何工人都可以在任何给定的时间处理单个任务,所以您可以得到所需的内容。

编辑:

根据Workers Guide > Concurrency

By default multiprocessing is used to perform concurrent execution of tasks, but you can also use Eventlet. The number of worker processes/threads can be changed using the --concurrency argument and defaults to the number of CPUs available on the machine.

因此,您需要像这样运行worker:

$ celery worker --concurrency=1

编辑2:

令人惊讶的是,还有另一个解决方案,而且甚至在官方文档中也有,请参阅Ensuring a task is only executed one at a time文章。

我已经实现了一个装饰器来处理这个问题。这是基于官方芹菜文件的Ensuring a task is only executed one at a time

它使用函数名及其参数和kwargs创建一个lock_id,该id在Django的缓存层中设置/获取(我只在Memcached中测试过,但它也应该在Redis中使用)。如果已在缓存中设置了lock_id,则会将任务放回队列并退出。

CACHE_LOCK_EXPIRE = 30


def no_simultaneous_execution(f):
    """
    Decorator that prevents a task form being executed with the
    same *args and **kwargs more than one at a time.
    """
    @functools.wraps(f)
    def wrapper(self, *args, **kwargs):
        # Create lock_id used as cache key
        lock_id = '{}-{}-{}'.format(self.name, args, kwargs)

        # Timeout with a small diff, so we'll leave the lock delete
        # to the cache if it's close to being auto-removed/expired
        timeout_at = monotonic() + CACHE_LOCK_EXPIRE - 3

        # Try to acquire a lock, or put task back on queue
        lock_acquired = cache.add(lock_id, True, CACHE_LOCK_EXPIRE)
        if not lock_acquired:
            self.apply_async(args=args, kwargs=kwargs, countdown=3)
            return

        try:
            f(self, *args, **kwargs)
        finally:
            # Release the lock
            if monotonic() < timeout_at:
                cache.delete(lock_id)
    return wrapper

然后,您可以将它作为第一个装饰器应用于任何任务:

@shared_task(bind=True, base=MyTask)
@no_simultaneous_execution
def sometask(self, some_arg):
  ...

相关问题 更多 >