使用Twisted和PB等待事件
我有一个用Python写的应用程序,它使用了多个线程。我想知道在Python中,等待某些事情发生的最佳方法是什么,这样既不会浪费CPU资源,也不会锁住全局解释器锁(GIL)。
我的应用程序使用了Twisted框架,我启动了一个线程来运行一个耗时的操作,这样就不会影响到反应线程(reactor thread)。这个耗时的操作还会使用Twisted的deferToThread来启动一些线程去做其他事情,而最初的线程想要等待这些任务的结果。
我现在的做法是这样的:
while self._waiting:
time.sleep( 0.01 )
但是这样似乎会干扰Twisted的PB对象接收消息,所以我以为sleep会锁住GIL。不过,下面的帖子进一步调查后发现其实并不会。
其实还有更好的方法可以在不阻塞反应线程或Python的情况下等待线程。
5 个回答
你有没有试过使用条件变量?它们的用法是这样的:
condition = Condition()
def consumer_in_thread_A():
condition.acquire()
try:
while resource_not_yet_available:
condition.wait()
# Here, the resource is available and may be
# consumed
finally:
condition.release()
def produce_in_thread_B():
# ... create resource, whatsoever
condition.acquire()
try:
condition.notify_all()
finally:
condition.release()
条件变量就像锁一样(有acquire
和release
),但它们的主要作用是提供一种控制机制,让你可以wait
(等待)它们被notify
(通知)或者notify_all
(通知所有人)。
我最近发现,调用
time.sleep( X )
会锁住全局解释器锁(GIL),这会导致所有的 Python 线程在这段时间内都停下来。
你理解错了——这绝对不是它的工作方式。你是从哪里看到这个错误信息的?
不过,你在评论中澄清了(最好把你的问题编辑一下!)你在使用 deferToThread
,而你遇到的问题是……:
是的,我把动作推迟到一个线程,并给 twisted 一个回调。但是父线程需要等到所有子线程完成后,才能继续处理新的子线程。
所以你可以用一个带有计数器的对象的方法作为回调——把计数器初始化为 0,每次你推迟到线程时就加 1,在回调方法中减 1。
当回调方法看到计数器减到 0 时,就知道“所有子线程都完成了”,这时就可以“开始新的子线程”,然后在这种情况下调用“生成新的子线程”的函数或方法——就这么简单!
例如(这段代码可能有错,因为没测试过,只是给你个概念)……:
class Waiter(object):
def __init__(self, what_next, *a, **k):
self.counter = 0
self.what_next = what_next
self.a = a
self.k = k
def one_more(self):
self.counter += 1
def do_wait(self, *dont_care):
self.counter -= 1
if self.counter == 0:
self.what_next(*self.a, **self.k)
def spawn_one_thread(waiter, long_calculation, *a, **k):
waiter.one_more()
d = threads.deferToThread(long_calculation, *a, **k)
d.addCallback(waiter.do_wait)
def spawn_all(waiter, list_of_lists_of_functions_args_and_kwds):
if not list_of_lists_of_functions_args_and_kwds:
return
if waiter is None:
waiter=Waiter(spawn_all, list_of_lists_of_functions_args_and_kwds)
this_time = list_of_list_of_functions_args_and_kwds.pop(0)
for f, a, k in this_time:
spawn_one_thread(waiter, f, *a, **k)
def start_it_all(list_of_lists_of_functions_args_and_kwds):
spawn_all(None, list_of_lists_of_functions_args_and_kwds)
如果你已经在使用Twisted,那么你就不需要像这样“等待”。
根据你的描述:
我创建了一个线程来运行一个长时间的操作……这个长时间的操作还使用Twisted的deferToThread创建了一些线程……
这说明你是在“长时间操作”的线程中调用deferToThread
,而不是在主线程中(也就是运行reactor.run()
的那个线程)。正如Jean-Paul Calderone在评论中提到的,你只能在主反应器线程中调用Twisted的API(比如deferToThread
)。
你看到的锁死现象是没有遵循这个规则的常见症状。这和全局解释器锁(GIL)没有关系,完全是因为你把Twisted的反应器搞坏了。
根据你对程序的模糊描述,我尝试写了一个示例程序,完全基于Twisted的API,所有线程都是通过Twisted创建的,并且都由主反应器线程控制。
import time
from twisted.internet import reactor
from twisted.internet.defer import gatherResults
from twisted.internet.threads import deferToThread, blockingCallFromThread
def workReallyHard():
"'Work' function, invoked in a thread."
time.sleep(0.2)
def longOperation():
for x in range(10):
workReallyHard()
blockingCallFromThread(reactor, startShortOperation, x)
result = blockingCallFromThread(reactor, gatherResults, shortOperations)
return 'hooray', result
def shortOperation(value):
workReallyHard()
return value * 100
shortOperations = []
def startShortOperation(value):
def done(result):
print 'Short operation complete!', result
return result
shortOperations.append(
deferToThread(shortOperation, value).addCallback(done))
d = deferToThread(longOperation)
def allDone(result):
print 'Long operation complete!', result
reactor.stop()
d.addCallback(allDone)
reactor.run()
注意在allDone
中反应器停止的那一点,你可以再启动另一个“长时间操作”,让它重新开始整个过程。