twisted:如何优雅地在反应堆代码和线程代码之间通信?

2 投票
3 回答
3163 浏览
提问于 2025-04-16 02:39

我有一个客户端,它通过 twisted 连接到一个服务器。这个客户端有一个线程,可能会在后台做一些事情。当反应器(reactor)关闭时,我需要:

1) check if the thread is doing things
2) stop it if it is

有没有什么优雅的方法来做到这一点?我现在能想到的就是一些混乱的做法,比如:

def cleanup(self):
    isWorkingDF = defer.Deferred()
    doneDF = defer.Deferred()

    def checkIsWorking():
        res = self.stuff.isWorking() #blocking call
        reactor.callFromThread(isWorkingDF.callback, res)

    def shutdownOrNot(isWorking):
        if isWorking:
            #shutdown necessary, shutdown is also a blocking call
            def shutdown():
                self.stuff.shutdown()
                reactor.callFromThread(doneDF, None)
            reactor.callInThread(shutdown)                
        else:
            doneDF.callback(None) #no shutdown needed

    isWorkingDF.addCallback(shutdownOrNot)

    reactor.callInThread(checkIsWorking)

    return doneDF

首先,我们要检查它是否在工作。这个检查的结果会放到 rescallback 里,它要么关闭,要么不关闭,然后再触发 doneDF,twisted 会在关闭之前等这个完成。

听起来挺乱的,对吧!有没有更好的方法呢?

也许还有一个相关的问题,就是有没有更优雅的方法来把回调(callbacks)串联起来?我觉得在这个完成后,我可能还需要做更多的清理工作,那样我就得创建一个不同的 done deferred,然后让当前的 doneDF 触发一个回调,做一些事情后再调用那个 done deferred……

3 个回答

0

如果程序在你关闭反应堆后就结束了,你可以把这个线程设置成守护线程。这样的话,当所有非守护线程结束后,守护线程会自动退出。只需要在调用 start() 之前,把线程对象的 daemon 属性设置为 True 就可以了。

如果这样做不行,比如说线程在退出前需要清理一些资源,那么你可以通过一个队列来让反应堆和线程之间进行沟通。把需要完成的工作放到一个队列对象里,然后让线程从队列中取出这些工作来做。可以设置一个特殊的“结束”标记(或者简单地用 None)来表示线程需要终止。

5

你可以通过使用 deferToThread 来简化这个过程,而不是使用 callInThreadcallFromThread 这两个配对。

from twisted.internet.threads import deferToThread

def cleanup(self):
    isWorkingDF = deferToThread(self.stuff.isWorking)

    def shutdownOrNot(isWorking):
        if isWorking:
            #shutdown necessary, shutdown is also a blocking call
            return deferToThread(self.stuff.shutdown)

    isWorkingDF.addCallback(shutdownOrNot)

    return isWorkingDF

deferToThread 实际上就是对你之前在函数中实现的线程逻辑的一个简单封装,避免了重复的代码。

6

其实真正的解决办法是使用 defer.inlineCallbacks 这个装饰器。上面的代码现在变成了:

@defer.inlineCallbacks
def procShutdownStuff(self):
    isWorking = yield deferToThread(self.stuff.isWorking)

    if isWorking:
        yield deferToThread(self.stuff.shutdown)

def cleanup(self):
    return self.procShutdownStuff()

撰写回答