我已经实现了一个consumer/producer优先级队列,其中优先级实际上是一个时间戳,表示应该交付项目的时间。它运行得很好,但我想知道是否有人有更好的想法来实现这个或对当前实现的评论。在
代码是用Python编写的。创建一个线程来准时唤醒等待的消费者。我知道这是在库中创建线程的反模式,但我无法设计出其他方法。在
代码如下:
import collections
import heapq
import threading
import time
class TimelyQueue(threading.Thread):
"""
Implements a similar but stripped down interface of Queue which
delivers items on time only.
"""
class Locker:
def __init__(self, lock):
self.l = lock
def __enter__(self):
self.l.acquire()
return self.l
def __exit__(self, type, value, traceback):
self.l.release()
# Optimization to avoid wasting CPU cycles when something
# is about to happen in less than 5 ms.
_RESOLUTION = 0.005
def __init__(self):
threading.Thread.__init__(self)
self.daemon = True
self.queue = []
self.triggered = collections.deque()
self.putcond = threading.Condition()
self.getcond = threading.Condition()
# Optimization to avoid waking the thread uselessly.
self.putwaketime = 0
def put(self, when, item):
with self.Locker(self.putcond):
heapq.heappush(self.queue, (when, item))
if when < self.putwaketime or self.putwaketime == 0:
self.putcond.notify()
def get(self, timeout=None):
with self.Locker(self.getcond):
if len(self.triggered) > 0:
when, item = self.triggered.popleft()
return item
self.getcond.wait(timeout)
try:
when, item = self.triggered.popleft()
except IndexError:
return None
return item
def qsize(self):
with self.Locker(self.putcond):
return len(self.queue)
def run(self):
with self.Locker(self.putcond):
maxwait = None
while True:
curtime = time.time()
try:
when, item = self.queue[0]
maxwait = when - curtime
self.putwaketime = when
except IndexError:
maxwait = None
self.putwaketime = 0
self.putcond.wait(maxwait)
curtime = time.time()
while True:
# Don't dequeue now, we are not sure to use it yet.
try:
when, item = self.queue[0]
except IndexError:
break
if when > curtime + self._RESOLUTION:
break
self.triggered.append(heapq.heappop(self.queue))
if len(self.triggered) > 0:
with self.Locker(self.getcond):
self.getcond.notify()
if __name__ == "__main__":
q = TimelyQueue()
q.start()
N = 50000
t0 = time.time()
for i in range(N):
q.put(time.time() + 2, i)
dt = time.time() - t0
print "put done in %.3fs (%.2f put/sec)" % (dt, N / dt)
t0 = time.time()
i = 0
while i < N:
a = q.get(3)
if i == 0:
dt = time.time() - t0
print "start get after %.3fs" % dt
t0 = time.time()
i += 1
dt = time.time() - t0
print "get done in %.3fs (%.2f get/sec)" % (dt, N / dt)
你唯一真正需要的后台线程是一个计时器,当它用完的时候踢服务员,对吗?在
首先,可以使用^{} 而不是显式的后台线程来实现它。但是,虽然这可能更简单,但它并不能真正解决这样一个问题:你在用户背后创建一个线程,不管他们是否愿意。另外,使用
threading.Timer
,实际上每次重新启动计时器时都会产生一个新线程,这可能是性能问题。(一次只能有一个线程,但启动和停止线程仍然是不可用的。)如果你看看PyPI模块、ActiveState配方和各种框架,有许多实现可以让你在一个后台线程上运行多个计时器。那就能解决你的问题了。在
但这仍然不是一个完美的解决方案。例如,假设我的应用程序需要20个
TimelyQueue
对象或一个TimelyQueue
加上19个其他所有需要计时器的东西。最后我还是有20条线。或者,假设我正在构建一个socket服务器或GUI应用程序(您的TimelyQueue
的两个最明显的用例;我可以在事件循环的顶部实现一个计时器(或者,很可能只是使用框架附带的计时器),那么我为什么需要线程呢?在解决这个问题的方法是提供一个钩子来供应任何定时器工厂:
现在,当您需要调整计时器时:
^{pr2}$对于快速和肮脏的用例,这已经足够好了。但是,如果我使用^{} ,我可以使用
TimelyQueue(twisted.reactor.callLater)
,现在队列的计时器经过twisted
事件循环。或者,如果我有一个在别处使用的多定时器单线程实现,TimelyQueue(multiTimer.add)
,现在队列的计时器与我所有其他计时器在同一个线程上。在如果你愿意的话,你可以提供一个比
threading.Timer
更好的默认值,但实际上,我认为大多数需要比threading.Timer
更好的东西的人将能够为他们的特定应用提供比你提供的更好的东西。在当然,并不是每个定时器实现都有与} 没有
threading.Timer
相同的API,尽管你会惊讶于其中有多少是这样的。但是,如果您有一个计时器要与TimelyQueue
一起使用,但它的接口错误,那么编写适配器并不困难。例如,如果我正在构建一个PyQt4/PySide应用程序,^{cancel
方法,并且需要毫秒而不是秒,因此我必须执行如下操作:或者,如果我想更直接地将队列集成到
QObject
中,我可以结束QObject.startTimer()
,并让我的timerEvent(self)
方法调用回调。在一旦你考虑适配器,最后一个想法。我认为这不值得,但可能值得考虑。如果您的计时器使用时间戳而不是timedelta,并且有一个}实现可能会更简单,也可能更高效。在
adjust
方法而不是cancel
,并且拥有自己的waketime
,那么您的{put
中,您得到了如下内容:当然,大多数计时器不提供这个接口。但是如果有人有一个,或者愿意设计一个,他们可以得到好处。对于其他人,您可以提供一个简单的适配器,将
threading.Timer
样式的计时器转换为您需要的类型,类似于:作为记录,我已经实现了你所建议的使用定时器工厂。我使用上面的版本运行了一个小的基准测试,新版本使用了
threading.Timer
类:第一次实施
在默认分辨率下(5ms,即5ms窗口内的所有内容都一起触发),它可以达到88k
put()
/秒和69kget()
/秒。当分辨率设置为0ms(无优化)时,它可以达到88k
put()
/秒和55kget()
/秒。第二次实施
在默认分辨率(5ms)下,它可以达到88k
put()
/秒和65kget()
/秒。当分辨率设置为0毫秒时,它可以达到88k
put()
/秒和62kget()
/秒。我承认我很惊讶第二个实现没有分辨率优化更快。现在调查已经太晚了。在
相关问题 更多 >
编程相关推荐