在Twisted应用中使用Celery作为控制通道
我正在尝试使用Celery作为一个Twisted应用的控制通道。我的Twisted应用是一个抽象层,提供一个标准接口来与各种本地运行的进程进行交互(通过ProcessProtocol)。我希望能够远程控制这些进程,AMQP看起来是从中心位置控制多个Twisted应用的理想方法,我想利用Celery的任务特性,比如任务重试、子任务等。
不过,这个计划并没有按我预想的那样顺利进行,我希望有人能帮我指点一下,看看怎么才能让它正常工作。
我希望在运行我的脚本时能实现以下行为:
- 启动一个稍微修改过的celeryd(见下文)
- 等待Celery任务的到来
- 当收到“启动进程”的任务时,生成一个ProcessProtocol
- 当收到其他任务时,在Twisted协议上运行一个函数,并使用Deferreds返回结果
这个“稍微修改过的celeryd”是celeryd,我做了一些小修改,让任务可以通过self.app.twisted访问Twisted反应器,通过self.app.process访问生成的进程。为了简单起见,我使用了Celery的“solo”进程池实现,这样就不会为任务工作者创建新的进程。
我的问题出现在我尝试使用Celery任务来初始化ProcessProtocol(也就是启动外部进程)时。进程确实启动了,但ProcessProtocol的childDataReceived从未被调用。我觉得这可能与文件描述符没有正确继承或设置有关。
下面是一些示例代码,基于ProcessProtocol文档中的“wc”示例。它包含两个Celery任务——一个是启动wc进程,另一个是计算一些文本中的单词数(使用之前启动的wc进程)。
这个例子有点牵强,但如果我能让它工作,将为实现我的ProcessProtocols提供一个很好的起点,这些Protocol是长时间运行的进程,会响应写入stdin的命令。
我通过先运行Celery守护进程来进行测试:
python2.6 mycelery.py -l info -P solo
然后在另一个窗口中,运行一个发送两个任务的脚本:
python2.6 command_test.py
command_test.py的预期行为是执行两个命令——一个启动wc进程,另一个将一些文本发送到CountWordsTask。实际上发生的是:
- StartProcTask生成了进程,并通过Deferred收到了“进程已启动”的响应
- CountWordsTask从未收到结果,因为childDataReceived从未被调用
有没有人能对此提供一些见解,或者给我一些建议,如何更好地使用Celery作为Twisted ProcessProtocols的控制通道?
是否更好为Celery编写一个基于Twisted的ProcessPool实现?我通过reactor.callLater调用WorkerCommand.execute_from_commandline的方法是否正确,以确保一切都在Twisted线程内发生?
我读过AMPoule,我认为它可以提供一些这样的功能,但如果可能的话,我希望继续使用Celery,因为我在应用的其他部分也在使用它。
任何帮助或建议都将不胜感激!
myceleryd.py
from functools import partial
from celery.app import App
from celery.bin.celeryd import WorkerCommand
from twisted.internet import reactor
class MyCeleryApp(App):
def __init__(self, twisted, *args, **kwargs):
self.twisted = twisted
super(MyCeleryApp, self).__init__(*args, **kwargs)
def main():
get_my_app = partial(MyCeleryApp, reactor)
worker = WorkerCommand(get_app=get_my_app)
reactor.callLater(1, worker.execute_from_commandline)
reactor.run()
if __name__ == '__main__':
main()
protocol.py
from twisted.internet import protocol
from twisted.internet.defer import Deferred
class WCProcessProtocol(protocol.ProcessProtocol):
def __init__(self, text):
self.text = text
self._waiting = {} # Dict to contain deferreds, keyed by command name
def connectionMade(self):
if 'startup' in self._waiting:
self._waiting['startup'].callback('process started')
def outReceived(self, data):
fieldLength = len(data) / 3
lines = int(data[:fieldLength])
words = int(data[fieldLength:fieldLength*2])
chars = int(data[fieldLength*2:])
self.transport.loseConnection()
self.receiveCounts(lines, words, chars)
if 'countWords' in self._waiting:
self._waiting['countWords'].callback(words)
def processExited(self, status):
print 'exiting'
def receiveCounts(self, lines, words, chars):
print >> sys.stderr, 'Received counts from wc.'
print >> sys.stderr, 'Lines:', lines
print >> sys.stderr, 'Words:', words
print >> sys.stderr, 'Characters:', chars
def countWords(self, text):
self._waiting['countWords'] = Deferred()
self.transport.write(text)
return self._waiting['countWords']
tasks.py
from celery.task import Task
from protocol import WCProcessProtocol
from twisted.internet.defer import Deferred
from twisted.internet import reactor
class StartProcTask(Task):
def run(self):
self.app.proc = WCProcessProtocol('testing')
self.app.proc._waiting['startup'] = Deferred()
self.app.twisted.spawnProcess(self.app.proc,
'wc',
['wc'],
usePTY=True)
return self.app.proc._waiting['startup']
class CountWordsTask(Task):
def run(self):
return self.app.proc.countWords('test test')
1 个回答
Celery在等待网络中新消息的时候可能会造成阻塞。因为你是在一个单线程的进程中运行它,同时又在使用Twisted的反应器,这样就会阻止反应器的运行。反应器是Twisted的核心部分,它需要正常运行才能发挥作用(你调用了reactor.run
,但由于Celery在阻塞它,实际上反应器并没有运行)。
reactor.callLater
只是延迟了Celery的启动。一旦Celery启动,它仍然会阻塞反应器。
你需要避免的问题就是阻塞反应器。
一个解决方案是把Celery放在一个线程里,把反应器放在另一个线程里。可以使用reactor.callFromThread
从Celery线程向Twisted发送消息(也就是“在反应器线程中调用函数”)。如果需要从Twisted线程向Celery发送消息,就用Celery的相应方法。
另一个解决方案是把Celery的协议(AMQP?可以参考txAMQP)实现为一个原生的Twisted库,这样就可以在不阻塞的情况下处理Celery消息。