使用Twisted和PB等待事件

2 投票
5 回答
2810 浏览
提问于 2025-04-16 00:18

我有一个用Python写的应用程序,它使用了多个线程。我想知道在Python中,等待某些事情发生的最佳方法是什么,这样既不会浪费CPU资源,也不会锁住全局解释器锁(GIL)。

我的应用程序使用了Twisted框架,我启动了一个线程来运行一个耗时的操作,这样就不会影响到反应线程(reactor thread)。这个耗时的操作还会使用Twisted的deferToThread来启动一些线程去做其他事情,而最初的线程想要等待这些任务的结果。

我现在的做法是这样的:

while self._waiting:
    time.sleep( 0.01 )

但是这样似乎会干扰Twisted的PB对象接收消息,所以我以为sleep会锁住GIL。不过,下面的帖子进一步调查后发现其实并不会。

其实还有更好的方法可以在不阻塞反应线程或Python的情况下等待线程。

5 个回答

5

你有没有试过使用条件变量?它们的用法是这样的:

condition = Condition()

def consumer_in_thread_A():
    condition.acquire()
    try:
        while resource_not_yet_available:
            condition.wait()
        # Here, the resource is available and may be 
        # consumed
    finally:
        condition.release()

def produce_in_thread_B():
    # ... create resource, whatsoever
    condition.acquire()
    try:
        condition.notify_all()
    finally:
        condition.release()

条件变量就像锁一样(有acquirerelease),但它们的主要作用是提供一种控制机制,让你可以wait(等待)它们被notify(通知)或者notify_all(通知所有人)。

5

我最近发现,调用 time.sleep( X ) 会锁住全局解释器锁(GIL),这会导致所有的 Python 线程在这段时间内都停下来。

你理解错了——这绝对不是它的工作方式。你是从哪里看到这个错误信息的?

不过,你在评论中澄清了(最好把你的问题编辑一下!)你在使用 deferToThread,而你遇到的问题是……:

是的,我把动作推迟到一个线程,并给 twisted 一个回调。但是父线程需要等到所有子线程完成后,才能继续处理新的子线程。

所以你可以用一个带有计数器的对象的方法作为回调——把计数器初始化为 0,每次你推迟到线程时就加 1,在回调方法中减 1。

当回调方法看到计数器减到 0 时,就知道“所有子线程都完成了”,这时就可以“开始新的子线程”,然后在这种情况下调用“生成新的子线程”的函数或方法——就这么简单!

例如(这段代码可能有错,因为没测试过,只是给你个概念)……:

class Waiter(object):

  def __init__(self, what_next, *a, **k):
    self.counter = 0
    self.what_next = what_next
    self.a = a
    self.k = k

  def one_more(self):
    self.counter += 1

  def do_wait(self, *dont_care):
    self.counter -= 1
    if self.counter == 0:
    self.what_next(*self.a, **self.k)


def spawn_one_thread(waiter, long_calculation, *a, **k):
  waiter.one_more()
  d = threads.deferToThread(long_calculation, *a, **k)
  d.addCallback(waiter.do_wait)

def spawn_all(waiter, list_of_lists_of_functions_args_and_kwds):
  if not list_of_lists_of_functions_args_and_kwds:
    return
  if waiter is None:
    waiter=Waiter(spawn_all, list_of_lists_of_functions_args_and_kwds)
  this_time = list_of_list_of_functions_args_and_kwds.pop(0)
  for f, a, k in this_time:
    spawn_one_thread(waiter, f, *a, **k)

def start_it_all(list_of_lists_of_functions_args_and_kwds):
  spawn_all(None, list_of_lists_of_functions_args_and_kwds)
13

如果你已经在使用Twisted,那么你就不需要像这样“等待”。

根据你的描述:

我创建了一个线程来运行一个长时间的操作……这个长时间的操作还使用Twisted的deferToThread创建了一些线程……

这说明你是在“长时间操作”的线程中调用deferToThread,而不是在主线程中(也就是运行reactor.run()的那个线程)。正如Jean-Paul Calderone在评论中提到的,你只能在主反应器线程中调用Twisted的API(比如deferToThread)。

你看到的锁死现象是没有遵循这个规则的常见症状。这和全局解释器锁(GIL)没有关系,完全是因为你把Twisted的反应器搞坏了。

根据你对程序的模糊描述,我尝试写了一个示例程序,完全基于Twisted的API,所有线程都是通过Twisted创建的,并且都由主反应器线程控制。

import time

from twisted.internet import reactor
from twisted.internet.defer import gatherResults
from twisted.internet.threads import deferToThread, blockingCallFromThread

def workReallyHard():
    "'Work' function, invoked in a thread."
    time.sleep(0.2)

def longOperation():
    for x in range(10):
        workReallyHard()
        blockingCallFromThread(reactor, startShortOperation, x)
    result = blockingCallFromThread(reactor, gatherResults, shortOperations)
    return 'hooray', result

def shortOperation(value):
    workReallyHard()
    return value * 100

shortOperations = []

def startShortOperation(value):
    def done(result):
        print 'Short operation complete!', result
        return result
    shortOperations.append(
        deferToThread(shortOperation, value).addCallback(done))

d = deferToThread(longOperation)
def allDone(result):
    print 'Long operation complete!', result
    reactor.stop()
d.addCallback(allDone)

reactor.run()

注意在allDone中反应器停止的那一点,你可以再启动另一个“长时间操作”,让它重新开始整个过程。

撰写回答