Celery能否协作运行协程作为可恢复的状态任务?
我现在正在研究Celery,打算用它来处理视频的后台工作。我的问题大致如下:
- 我有一个前端的网页服务器,它同时处理大量的视频流(大约有几千个)。
- 每个视频流必须独立并且并行处理。
- 视频流的处理可以分为两种操作:
- 逐帧操作(这些计算不需要前一帧或后一帧的信息)
- 流级操作(这些计算需要处理一部分有序的、相邻的帧)
根据第三点,我需要在整个处理过程中维护和更新一个有序的帧结构,并将这个结构的子部分的计算任务分配给Celery的工作者。最开始,我考虑的组织方式如下:
[frontend server] -stream-> [celery worker 1 (greenlet)] --> [celery worker 2 (prefork)]
我的想法是celery worker 1
执行一些长时间运行的任务,这些任务主要是输入输出密集型的。简单来说,这些任务只会做以下几件事:
- 从前端服务器读取一帧
- 将这帧从base64格式解码
- 把它放入前面提到的有序数据结构中(目前是一个
collections.deque
对象)。
任何需要大量CPU计算的操作(比如图像分析)会交给celery worker 2
来处理。
我的问题是:
我想执行一个协程作为任务,这样我就可以有一个长时间运行的任务,从中可以yield
,以避免阻塞celery worker 1
的操作。换句话说,我希望能够做到类似于:
def coroutine(func):
@wraps(func)
def start(*args, **kwargs):
cr = func(*args, **kwargs)
cr.next()
return cr
return start
@coroutine
def my_taks():
stream = deque() # collections.deque
source = MyAsynchronousInputThingy() # something i'll make myself, probably using select
while source.open:
if source.has_data:
stream.append(Frame(source.readline())) # read data, build frame and enqueue to persistent structure
yield # cooperatively interrupt so that other tasks can execute
有没有办法让基于协程的任务无限运行,理想情况下在yield
时产生结果?
1 个回答
3
Eventlet的主要思想是,你可以像使用线程一样写同步代码,比如说,当你用socket.recv()
接收数据时,它会让当前的线程停下来,直到有数据到来。这种写法非常容易理解、维护,也方便在调试时思考。为了让事情变得高效和可扩展,Eventlet在后台做了一些“魔法”,把看似会阻塞的代码替换成绿色线程和一些机制(比如epoll、kqueue等),在合适的时机唤醒这些绿色线程。
所以,你只需要尽快执行eventlet.monkey_patch()
(比如在模块的第二行),并确保在MyInputThingy
中使用纯Python的socket操作。忘掉异步的概念,像使用线程那样写普通的阻塞代码就可以了。
Eventlet让同步代码重新变得好用。