Celery能否协作运行协程作为可恢复的状态任务?

2 投票
1 回答
1122 浏览
提问于 2025-04-29 16:02

我现在正在研究Celery,打算用它来处理视频的后台工作。我的问题大致如下:

  1. 我有一个前端的网页服务器,它同时处理大量的视频流(大约有几千个)。
  2. 每个视频流必须独立并且并行处理。
  3. 视频流的处理可以分为两种操作:
    1. 逐帧操作(这些计算不需要前一帧或后一帧的信息)
    2. 流级操作(这些计算需要处理一部分有序的、相邻的帧)

根据第三点,我需要在整个处理过程中维护和更新一个有序的帧结构,并将这个结构的子部分的计算任务分配给Celery的工作者。最开始,我考虑的组织方式如下:

[frontend server]  -stream-> [celery worker 1 (greenlet)] --> [celery worker 2 (prefork)]

我的想法是celery worker 1执行一些长时间运行的任务,这些任务主要是输入输出密集型的。简单来说,这些任务只会做以下几件事:

  1. 从前端服务器读取一帧
  2. 将这帧从base64格式解码
  3. 把它放入前面提到的有序数据结构中(目前是一个collections.deque对象)。

任何需要大量CPU计算的操作(比如图像分析)会交给celery worker 2来处理。

我的问题是:

我想执行一个协程作为任务,这样我就可以有一个长时间运行的任务,从中可以yield,以避免阻塞celery worker 1的操作。换句话说,我希望能够做到类似于:

def coroutine(func):
    @wraps(func)
    def start(*args, **kwargs):
        cr = func(*args, **kwargs)
        cr.next()
        return cr
    return start

@coroutine
def my_taks():
    stream = deque()  # collections.deque
    source = MyAsynchronousInputThingy()  # something i'll make myself, probably using select

    while source.open:
        if source.has_data:
            stream.append(Frame(source.readline()))  # read data, build frame and enqueue to persistent structure
        yield  # cooperatively interrupt so that other tasks can execute

有没有办法让基于协程的任务无限运行,理想情况下在yield时产生结果?

暂无标签

1 个回答

3

Eventlet的主要思想是,你可以像使用线程一样写同步代码,比如说,当你用socket.recv()接收数据时,它会让当前的线程停下来,直到有数据到来。这种写法非常容易理解、维护,也方便在调试时思考。为了让事情变得高效和可扩展,Eventlet在后台做了一些“魔法”,把看似会阻塞的代码替换成绿色线程和一些机制(比如epoll、kqueue等),在合适的时机唤醒这些绿色线程。

所以,你只需要尽快执行eventlet.monkey_patch()(比如在模块的第二行),并确保在MyInputThingy中使用纯Python的socket操作。忘掉异步的概念,像使用线程那样写普通的阻塞代码就可以了。

Eventlet让同步代码重新变得好用。

撰写回答