在Twisted应用中使用Celery作为控制通道

12 投票
1 回答
3324 浏览
提问于 2025-04-17 06:21

我正在尝试使用Celery作为一个Twisted应用的控制通道。我的Twisted应用是一个抽象层,提供一个标准接口来与各种本地运行的进程进行交互(通过ProcessProtocol)。我希望能够远程控制这些进程,AMQP看起来是从中心位置控制多个Twisted应用的理想方法,我想利用Celery的任务特性,比如任务重试、子任务等。

不过,这个计划并没有按我预想的那样顺利进行,我希望有人能帮我指点一下,看看怎么才能让它正常工作。

我希望在运行我的脚本时能实现以下行为:

  • 启动一个稍微修改过的celeryd(见下文)
  • 等待Celery任务的到来
  • 当收到“启动进程”的任务时,生成一个ProcessProtocol
  • 当收到其他任务时,在Twisted协议上运行一个函数,并使用Deferreds返回结果

这个“稍微修改过的celeryd”是celeryd,我做了一些小修改,让任务可以通过self.app.twisted访问Twisted反应器,通过self.app.process访问生成的进程。为了简单起见,我使用了Celery的“solo”进程池实现,这样就不会为任务工作者创建新的进程。

我的问题出现在我尝试使用Celery任务来初始化ProcessProtocol(也就是启动外部进程)时。进程确实启动了,但ProcessProtocol的childDataReceived从未被调用。我觉得这可能与文件描述符没有正确继承或设置有关。

下面是一些示例代码,基于ProcessProtocol文档中的“wc”示例。它包含两个Celery任务——一个是启动wc进程,另一个是计算一些文本中的单词数(使用之前启动的wc进程)。

这个例子有点牵强,但如果我能让它工作,将为实现我的ProcessProtocols提供一个很好的起点,这些Protocol是长时间运行的进程,会响应写入stdin的命令。

我通过先运行Celery守护进程来进行测试:

python2.6 mycelery.py -l info -P solo

然后在另一个窗口中,运行一个发送两个任务的脚本:

python2.6 command_test.py

command_test.py的预期行为是执行两个命令——一个启动wc进程,另一个将一些文本发送到CountWordsTask。实际上发生的是:

  • StartProcTask生成了进程,并通过Deferred收到了“进程已启动”的响应
  • CountWordsTask从未收到结果,因为childDataReceived从未被调用

有没有人能对此提供一些见解,或者给我一些建议,如何更好地使用Celery作为Twisted ProcessProtocols的控制通道?

是否更好为Celery编写一个基于Twisted的ProcessPool实现?我通过reactor.callLater调用WorkerCommand.execute_from_commandline的方法是否正确,以确保一切都在Twisted线程内发生?

我读过AMPoule,我认为它可以提供一些这样的功能,但如果可能的话,我希望继续使用Celery,因为我在应用的其他部分也在使用它。

任何帮助或建议都将不胜感激!

myceleryd.py

from functools import partial
from celery.app import App
from celery.bin.celeryd import WorkerCommand
from twisted.internet import reactor


class MyCeleryApp(App):
    def __init__(self, twisted, *args, **kwargs):
        self.twisted = twisted
        super(MyCeleryApp, self).__init__(*args, **kwargs)

def main():
    get_my_app = partial(MyCeleryApp, reactor)
    worker = WorkerCommand(get_app=get_my_app)
    reactor.callLater(1, worker.execute_from_commandline)
    reactor.run()

if __name__ == '__main__':
    main()

protocol.py

from twisted.internet import protocol
from twisted.internet.defer import Deferred

class WCProcessProtocol(protocol.ProcessProtocol):

    def __init__(self, text):
        self.text = text
        self._waiting = {} # Dict to contain deferreds, keyed by command name

    def connectionMade(self):
        if 'startup' in self._waiting:
            self._waiting['startup'].callback('process started')

    def outReceived(self, data):
        fieldLength = len(data) / 3
        lines = int(data[:fieldLength])
        words = int(data[fieldLength:fieldLength*2])
        chars = int(data[fieldLength*2:])
        self.transport.loseConnection()
        self.receiveCounts(lines, words, chars)

        if 'countWords' in self._waiting:
            self._waiting['countWords'].callback(words)

    def processExited(self, status):
        print 'exiting'


    def receiveCounts(self, lines, words, chars):
        print >> sys.stderr, 'Received counts from wc.'
        print >> sys.stderr, 'Lines:', lines
        print >> sys.stderr, 'Words:', words
        print >> sys.stderr, 'Characters:', chars

    def countWords(self, text):
        self._waiting['countWords'] = Deferred()
        self.transport.write(text)
        return self._waiting['countWords']

tasks.py

from celery.task import Task
from protocol import WCProcessProtocol
from twisted.internet.defer import Deferred
from twisted.internet import reactor

class StartProcTask(Task):
    def run(self):
        self.app.proc = WCProcessProtocol('testing')
        self.app.proc._waiting['startup'] = Deferred()
        self.app.twisted.spawnProcess(self.app.proc,
                                      'wc',
                                      ['wc'],
                                      usePTY=True)
        return self.app.proc._waiting['startup']

class CountWordsTask(Task):
    def run(self):
        return self.app.proc.countWords('test test')

1 个回答

12

Celery在等待网络中新消息的时候可能会造成阻塞。因为你是在一个单线程的进程中运行它,同时又在使用Twisted的反应器,这样就会阻止反应器的运行。反应器是Twisted的核心部分,它需要正常运行才能发挥作用(你调用了reactor.run,但由于Celery在阻塞它,实际上反应器并没有运行)。

reactor.callLater只是延迟了Celery的启动。一旦Celery启动,它仍然会阻塞反应器。

你需要避免的问题就是阻塞反应器。

一个解决方案是把Celery放在一个线程里,把反应器放在另一个线程里。可以使用reactor.callFromThread从Celery线程向Twisted发送消息(也就是“在反应器线程中调用函数”)。如果需要从Twisted线程向Celery发送消息,就用Celery的相应方法。

另一个解决方案是把Celery的协议(AMQP?可以参考txAMQP)实现为一个原生的Twisted库,这样就可以在不阻塞的情况下处理Celery消息。

撰写回答