线程等待时随机休眠带超时

10 投票
2 回答
2969 浏览
提问于 2025-04-17 20:36

在我开始描述我的问题之前,值得一提的是,我使用的是Python 2.7。我没有检查过,但这可能和Python 3.x无关。

在使用Python的队列时,我发现了一些奇怪的事情。通常,当我从队列中获取一个对象时,我会设置一个比较长但有限的超时时间(比如几秒钟),这样可以在没有找到对象时进行调试和错误报告,尤其是当我期待能找到一个对象时。我发现,有时候在一个对象被放入一个之前是空的队列和我调用get方法获取这个对象之间,会有一个奇怪的时间间隔,尽管我在调用put方法之前就已经调用了get

深入研究后,我发现这个间隔是由于“睡眠”造成的。在Queue模块中,如果传给get方法的timeout参数不是None并且是正数,那么non_emptyConditionwait方法会被调用,并且会传入一个正数参数(这并不是100%准确;实际上,Queue的"_qsize"方法会先被验证返回0,但只要队列一开始是空的,接下来就会调用条件的等待)。

如果Conditionswait方法接收到超时参数,它的行为会有所不同。如果没有超时,它会简单地调用waiter.acquire。这个是用C语言定义的,超出了我的理解范围,但看起来是正常工作的。然而,如果给定了超时,就会出现一系列奇怪的“睡眠”过程,睡眠时间从某个任意值(1毫秒)开始,随着时间的推移而变长。以下是实际运行的代码:

# Balancing act:  We can't afford a pure busy loop, so we
# have to sleep; but if we sleep the whole timeout time,
# we'll be unresponsive.  The scheme here sleeps very
# little at first, longer as time goes on, but never longer
# than 20 times per second (or the timeout time remaining).
endtime = _time() + timeout
delay = 0.0005 # 500 us -> initial delay of 1 ms
while True:
    gotit = waiter.acquire(0)
    if gotit:
        break
    remaining = endtime - _time()
    if remaining <= 0:
        break
    delay = min(delay * 2, remaining, .05)
    _sleep(delay)

这显然就是我发现的新对象被放入之前空的队列和已经调用的get方法返回这个对象之间的时间间隔的原因。由于延迟时间呈指数增长,直到被一个我认为很大的0.05秒的时间阻塞,这在我的应用程序中造成了意外且不希望的显著“睡眠”。

你能解释一下这样做的目的是什么吗?Python的开发者是否认为没有Python用户会在意这样的时间长度?有没有快速的解决方法或者合适的修复?你建议我重载线程模块吗?

2 个回答

0

为了确保队列没有出现奇怪的情况,你可以使用 get_nowait 方法和 Empty 异常。看看我在生产服务器上用的这些代码。(当然,这里做了修改以适应这个例子)。

from Queue import Queue, Empty

while receiver.isAlive:
    try:
        rec = Record(queue.get_nowait())
    except Empty:
        # Set someTime with the value you want
        someTime = 0.1
        sleep(someTime)
    else:
        doSomething(rec)

另外,还要记住以下几点:

time.sleep() 函数是利用操作系统的 sleep() 函数来工作的。这个函数有一些限制。例如,在标准的 Windows 系统上,最短的休眠时间是 10 到 13 毫秒。而在 Linux 系统上,休眠时间的精度通常更高,间隔一般接近 1 毫秒。

5

我最近也遇到了同样的问题,最后发现问题出在threading模块的这段代码上。

真让人头疼。


你能解释一下这段代码的目的是什么吗?难道Python开发者认为没有用户会在意这样的时间长度吗?

我也不知道...


你建议我重载一下threading模块吗?

你可以选择重载threading模块,或者直接换到python3,因为在这个版本中,这部分的实现已经修复了。

对我来说,迁移到python3会花费很多精力,所以我选择了前者。我做的事情是:

  1. 我用cython快速创建了一个.so文件,里面有一个和pthread的接口。这个文件包含了调用相应pthread_mutex_*函数的Python函数,并且链接了libpthread。特别是,和我们关注的任务最相关的函数是pthread_mutex_timedlock
  2. 我创建了一个新的threading2模块,并把我代码里所有的import threading都替换成import threading2。在threading2中,我重新定义了所有和threading相关的类(LockConditionEvent),还有我常用的Queue模块里的类(QueuePriorityQueue)。Lock类是完全用pthread_mutex_*函数重新实现的,但其他的就简单多了——我只是继承了原来的类(比如threading.Event),然后重写了__init__来创建我的新Lock类型。其他的部分就正常工作了。

新的Lock类型的实现和threading中的原始实现非常相似,但我把新的acquire方法的实现基于在python3threading模块中找到的代码(自然,这部分比前面提到的“平衡行为”要简单得多)。这个部分相对容易。

(顺便说一下,在我的案例中,这样做让我多线程处理的速度提升了30%。比我预期的还要多。)

希望这能帮到你。

撰写回答