线程等待时随机休眠带超时
在我开始描述我的问题之前,值得一提的是,我使用的是Python 2.7。我没有检查过,但这可能和Python 3.x无关。
在使用Python的队列时,我发现了一些奇怪的事情。通常,当我从队列中获取一个对象时,我会设置一个比较长但有限的超时时间(比如几秒钟),这样可以在没有找到对象时进行调试和错误报告,尤其是当我期待能找到一个对象时。我发现,有时候在一个对象被放入一个之前是空的队列和我调用get
方法获取这个对象之间,会有一个奇怪的时间间隔,尽管我在调用put
方法之前就已经调用了get
。
深入研究后,我发现这个间隔是由于“睡眠”造成的。在Queue
模块中,如果传给get
方法的timeout
参数不是None
并且是正数,那么non_empty
的Condition
的wait
方法会被调用,并且会传入一个正数参数(这并不是100%准确;实际上,Queue
的"_qsize
"方法会先被验证返回0,但只要队列一开始是空的,接下来就会调用条件的等待)。
如果Conditions
的wait
方法接收到超时参数,它的行为会有所不同。如果没有超时,它会简单地调用waiter.acquire
。这个是用C
语言定义的,超出了我的理解范围,但看起来是正常工作的。然而,如果给定了超时,就会出现一系列奇怪的“睡眠”过程,睡眠时间从某个任意值(1毫秒)开始,随着时间的推移而变长。以下是实际运行的代码:
# Balancing act: We can't afford a pure busy loop, so we
# have to sleep; but if we sleep the whole timeout time,
# we'll be unresponsive. The scheme here sleeps very
# little at first, longer as time goes on, but never longer
# than 20 times per second (or the timeout time remaining).
endtime = _time() + timeout
delay = 0.0005 # 500 us -> initial delay of 1 ms
while True:
gotit = waiter.acquire(0)
if gotit:
break
remaining = endtime - _time()
if remaining <= 0:
break
delay = min(delay * 2, remaining, .05)
_sleep(delay)
这显然就是我发现的新对象被放入之前空的队列和已经调用的get
方法返回这个对象之间的时间间隔的原因。由于延迟时间呈指数增长,直到被一个我认为很大的0.05秒的时间阻塞,这在我的应用程序中造成了意外且不希望的显著“睡眠”。
你能解释一下这样做的目的是什么吗?Python的开发者是否认为没有Python用户会在意这样的时间长度?有没有快速的解决方法或者合适的修复?你建议我重载线程模块吗?
2 个回答
为了确保队列没有出现奇怪的情况,你可以使用 get_nowait 方法和 Empty 异常。看看我在生产服务器上用的这些代码。(当然,这里做了修改以适应这个例子)。
from Queue import Queue, Empty
while receiver.isAlive:
try:
rec = Record(queue.get_nowait())
except Empty:
# Set someTime with the value you want
someTime = 0.1
sleep(someTime)
else:
doSomething(rec)
另外,还要记住以下几点:
time.sleep() 函数是利用操作系统的 sleep() 函数来工作的。这个函数有一些限制。例如,在标准的 Windows 系统上,最短的休眠时间是 10 到 13 毫秒。而在 Linux 系统上,休眠时间的精度通常更高,间隔一般接近 1 毫秒。
我最近也遇到了同样的问题,最后发现问题出在threading
模块的这段代码上。
真让人头疼。
你能解释一下这段代码的目的是什么吗?难道Python开发者认为没有用户会在意这样的时间长度吗?
我也不知道...
你建议我重载一下threading模块吗?
你可以选择重载threading模块,或者直接换到python3
,因为在这个版本中,这部分的实现已经修复了。
对我来说,迁移到python3会花费很多精力,所以我选择了前者。我做的事情是:
- 我用
cython
快速创建了一个.so
文件,里面有一个和pthread
的接口。这个文件包含了调用相应pthread_mutex_*
函数的Python函数,并且链接了libpthread
。特别是,和我们关注的任务最相关的函数是pthread_mutex_timedlock。 - 我创建了一个新的
threading2
模块,并把我代码里所有的import threading
都替换成import threading2
。在threading2
中,我重新定义了所有和threading
相关的类(Lock
、Condition
、Event
),还有我常用的Queue
模块里的类(Queue
和PriorityQueue
)。Lock
类是完全用pthread_mutex_*
函数重新实现的,但其他的就简单多了——我只是继承了原来的类(比如threading.Event
),然后重写了__init__
来创建我的新Lock
类型。其他的部分就正常工作了。
新的Lock
类型的实现和threading
中的原始实现非常相似,但我把新的acquire
方法的实现基于在python3
的threading
模块中找到的代码(自然,这部分比前面提到的“平衡行为”要简单得多)。这个部分相对容易。
(顺便说一下,在我的案例中,这样做让我多线程处理的速度提升了30%。比我预期的还要多。)
希望这能帮到你。