如何为Twisted的deferToThread API添加超时?
from twisted.internet import reactor from twisted.internet import threads from twisted.internet import defer import time def worker(arg): print 'Hello world' time.sleep(10) return 1 def run(): print 'Starting workers' l = [] for x in range(2): l.append(threads.deferToThread(worker, x)) return defer.DeferredList(l) def res(results): print results reactor.stop() d = run() d.addCallback(res) reactor.run()
如何通过超时来停止工作线程?
4 个回答
1
我们是这样做的,使用了一种叫做装饰器的东西。这种方法的好处是,当超时时间到达时,之前的操作会被取消。我觉得这应该成为Twisted库的一部分。
from twisted.internet import defer, reactor
def timeout(secs):
"""Decorator to add timeout to Deferred calls"""
def wrap(func):
@defer.inlineCallbacks
def _timeout(*args, **kwargs):
raw_d = func(*args, **kwargs)
if not isinstance(raw_d, defer.Deferred):
defer.returnValue(raw_d)
timeout_d = defer.Deferred()
times_up = reactor.callLater(secs, timeout_d.callback, None)
try:
raw_result, timeout_result = yield defer.DeferredList(
[raw_d, timeout_d], fireOnOneCallback=True, fireOnOneErrback=True,
consumeErrors=True)
except defer.FirstError as e: # Only raw_d should raise an exception
assert e.index == 0
times_up.cancel()
e.subFailure.raiseException()
else: # timeout
if timeout_d.called:
raw_d.cancel()
raise Exception("%s secs have expired" % secs)
# no timeout
times_up.cancel()
defer.returnValue(raw_result)
return _timeout
return wrap
3
虽然我们可能无法中断线程,但可以通过 cancel
函数来停止 Deferred,我认为这个功能在 Twisted 10.1.0 及之后的版本中是可以用的。
我使用了以下这个类来创建 Deferred,如果在一段时间内没有触发,就会回调一个特定的函数。这个方法可能对那些有类似问题的人有帮助。
编辑:根据下面评论的建议,最好不要直接继承 defer.Deferred
。所以,我已经修改了代码,使用一个包装器来实现相同的效果。
class DeferredWrapperWithTimeout(object):
'''
Holds a deferred that allows a specified function to be called-back
if the deferred does not fire before some specified timeout.
'''
def __init__(self, canceller=None):
self._def = defer.Deferred(canceller)
def _finish(self, r, t):
'''
Function to be called (internally) after the Deferred
has fired, in order to cancel the timeout.
'''
if ( (t!=None) and (t.active()) ):
t.cancel()
return r
def getDeferred(self):
return self._def
def addTimeoutCallback(self, reactr, timeout,
callUponTimeout, *args, **kw):
'''
The function 'callUponTimeout' (with optional args or keywords)
will be called after 'timeout' seconds, unless the Deferred fires.
'''
def timeoutCallback():
self._def.cancel()
callUponTimeout(*args, **kw)
toc = reactr.callLater(timeout, timeoutCallback)
return self._def.addCallback(self._finish, toc)
示例:在超时之前的回调:
from twisted.internet import reactor
from DeferredWithTimeout import *
dw = DeferredWrapperWithTimeout()
d = dw.getDeferred()
def testCallback(x=None):
print "called"
def testTimeout(x=None):
print "timedout"
d.addCallback(testCallback)
dw.addTimeoutCallback(reactor, 20, testTimeout, "to")
reactor.callLater(2, d.callback, "cb")
reactor.run()
打印“called”,没有其他输出。
示例:在回调之前超时:
from twisted.internet import reactor
from DeferredWithTimeout import *
dw = DeferredWrapperWithTimeout()
d = dw.getDeferred()
def testCallback(x=None):
print "called"
def testTimeout(x=None):
print "timedout"
d.addCallback(testCallback)
dw.addTimeoutCallback(reactor, 20, testTimeout, "to")
reactor.run()
在20秒后打印“timedout”,没有其他输出。
5
线程是不能被打断的,除非它们愿意配合你。比如说,time.sleep(10)
这个命令就不会配合你,所以我觉得你无法打断这个工作线程。如果你有另一种工作线程,它有几个不同的阶段,或者在一些任务上循环执行,那么你可以尝试这样做:
def worker(stop, jobs):
for j in jobs:
if stop:
break
j.do()
stop = []
d = deferToThread(worker)
# This will make the list eval to true and break out of the loop.
stop.append(None)
这并不是说只有 Twisted 才这样。这只是 Python 中线程的工作方式。