Python Twisted的deferToThread函数未按预期工作

2 投票
2 回答
2772 浏览
提问于 2025-04-17 07:07

我的目标是创建一个简单的服务器,处理像“处理我的字符串”这样的TCP消息,然后把“我的字符串”发送到一个比较耗时的操作(这里叫做slowFunction)去处理。我在这里通过deferToThread来调用这个函数,但似乎没有任何反应:defered的回调消息没有显示出来(调试点显示它从未被调用),而函数中的打印信息也没有显示(调试点显示它从未被调用)。

from twisted import protocols
from twisted.protocols import basic
from twisted.internet import threads, protocol, reactor
from twisted.application import service, internet
import re
import time

def slowFunction(arg):
    print "starting process"
    time.sleep(1)
    print "processed "+arg

class MySimpleServer(basic.LineReceiver):

    PROCESS_COMMAND = "process (.*)" #re pattern
    processFunction = slowFunction
    clients = []

    def connectionMade(self):
        print "Client connected"
        MySimpleServer.clients.append(self)

    def connectionLost(self, reason):
        print "Client gone"
        MySimpleServer.clients.remove(self)

    def onProcessDone(self):
        self.message("Process done")

    def onError(self):
        self.message("Process fail with error")

    def lineReceived(self, line):
        processArgumentResult = re.search(MySimpleServer.PROCESS_COMMAND, line)
        if not processArgumentResult == None:
            processArgument = processArgumentResult.groups()[0] 
            deferred = threads.deferToThread(MySimpleServer.processFunction, processArgument)
            deferred.addCallback(self.onProcessDone)
            deferred.addErrback(self.onError)
            self.message("processing your request")
        else:
            print "Unknown message line: "+line

    def message(self, message):
        self.transport.write(message + '\n')

if __name__ == '__main__':
    factory = protocol.ServerFactory()
    factory.protocol = MySimpleServer
    factory.client = []

    reactor.listenTCP(8000, factory)
    reactor.run()

2 个回答

3

你还可以用 staticmethod 来实现这个功能;这就是它的 唯一 正确用法。

class MySimpleServer(basic.LineReceiver):
    processFunction = staticmethod(slowFunction)
3

我得到了来自twisted irc的朋友们的一些帮助。

要点是:回调函数(onProcessDone和onError)应该接收一个结果参数,而由deferToThread调用的函数会接收到self作为参数(这个函数应该是MySimpleServer类中的一个方法)。

最终的代码现在是:

from twisted import protocols
from twisted.protocols import basic
from twisted.internet import threads, protocol, reactor
from twisted.application import service, internet
import re
import time

def slowFunction(arg):
    print "starting process"
    time.sleep(20)
    print "processed "+arg

class MySimpleServer(basic.LineReceiver):

    PROCESS_COMMAND = "process (.*)" #re pattern
    clients = []

    def connectionMade(self):
        print "Client connected"
        MySimpleServer.clients.append(self)

    def connectionLost(self, reason):
        print "Client gone"
        MySimpleServer.clients.remove(self)

    def onProcessDone(self, result):
        self.message("Process done")

    def onError(self, result):
        self.message("Process fail with error")

    def processFunction(self, processArgument):
        slowFunction(processArgument)

    def lineReceived(self, line):
        processArgumentResult = re.search(MySimpleServer.PROCESS_COMMAND, line)
        if not processArgumentResult == None:
            processArgument = processArgumentResult.groups()[0] 
            deferred = threads.deferToThread(self.processFunction, processArgument)
            deferred.addCallback(self.onProcessDone)
            deferred.addErrback(self.onError)
            self.message("processing your request")
        else:
            print "Unknown message line: "+line

    def message(self, message):
        self.transport.write(message + '\n')

if __name__ == '__main__':
    factory = protocol.ServerFactory()
    factory.protocol = MySimpleServer
    factory.client = []

    reactor.listenTCP(8000, factory)
    reactor.run()

撰写回答