消费者/生产商“及时”queu

2024-04-19 14:50:29 发布

您现在位置:Python中文网/ 问答频道 /正文

我已经实现了一个consumer/producer优先级队列,其中优先级实际上是一个时间戳,表示应该交付项目的时间。它运行得很好,但我想知道是否有人有更好的想法来实现这个或对当前实现的评论。在

代码是用Python编写的。创建一个线程来准时唤醒等待的消费者。我知道这是在库中创建线程的反模式,但我无法设计出其他方法。在

代码如下:

import collections
import heapq
import threading
import time

class TimelyQueue(threading.Thread):
    """
    Implements a similar but stripped down interface of Queue which
    delivers items on time only.
    """

    class Locker:
        def __init__(self, lock):
            self.l = lock
        def __enter__(self):
            self.l.acquire()
            return self.l
        def __exit__(self, type, value, traceback):
            self.l.release()

    # Optimization to avoid wasting CPU cycles when something
    # is about to happen in less than 5 ms.
    _RESOLUTION = 0.005

    def __init__(self):
        threading.Thread.__init__(self)
        self.daemon = True
        self.queue = []
        self.triggered = collections.deque()
        self.putcond = threading.Condition()
        self.getcond = threading.Condition()
        # Optimization to avoid waking the thread uselessly.
        self.putwaketime = 0

    def put(self, when, item):
        with self.Locker(self.putcond):
            heapq.heappush(self.queue, (when, item))
            if when < self.putwaketime or self.putwaketime == 0:
                self.putcond.notify()

    def get(self, timeout=None):
        with self.Locker(self.getcond):
            if len(self.triggered) > 0:
                when, item = self.triggered.popleft()
                return item
                self.getcond.wait(timeout)
            try:
                when, item = self.triggered.popleft()
            except IndexError:
                return None
            return item

    def qsize(self):
        with self.Locker(self.putcond):
            return len(self.queue)

    def run(self):
        with self.Locker(self.putcond):
            maxwait = None
            while True:
                curtime = time.time()
                try:
                    when, item = self.queue[0]
                    maxwait = when - curtime
                    self.putwaketime = when
                except IndexError:
                    maxwait = None
                    self.putwaketime = 0
                self.putcond.wait(maxwait)

                curtime = time.time()
                while True:
                    # Don't dequeue now, we are not sure to use it yet.
                    try:
                        when, item = self.queue[0]
                    except IndexError:
                        break
                    if when > curtime + self._RESOLUTION:
                        break

                    self.triggered.append(heapq.heappop(self.queue))
                if len(self.triggered) > 0:
                    with self.Locker(self.getcond):
                        self.getcond.notify()


if __name__ == "__main__":
    q = TimelyQueue()
    q.start()

    N = 50000
    t0 = time.time()
    for i in range(N):
        q.put(time.time() + 2, i)
    dt = time.time() - t0
    print "put done in %.3fs (%.2f put/sec)" % (dt, N / dt)
    t0 = time.time()
    i = 0
    while i < N:
        a = q.get(3)
        if i == 0:
            dt = time.time() - t0
            print "start get after %.3fs" % dt
            t0 = time.time()
        i += 1
    dt = time.time() - t0
    print "get done in %.3fs (%.2f get/sec)" % (dt, N / dt)

Tags: selfreturniftimequeuedefdtitem
2条回答

你唯一真正需要的后台线程是一个计时器,当它用完的时候踢服务员,对吗?在

首先,可以使用^{}而不是显式的后台线程来实现它。但是,虽然这可能更简单,但它并不能真正解决这样一个问题:你在用户背后创建一个线程,不管他们是否愿意。另外,使用threading.Timer,实际上每次重新启动计时器时都会产生一个新线程,这可能是性能问题。(一次只能有一个线程,但启动和停止线程仍然是不可用的。)

如果你看看PyPI模块、ActiveState配方和各种框架,有许多实现可以让你在一个后台线程上运行多个计时器。那就能解决你的问题了。在

但这仍然不是一个完美的解决方案。例如,假设我的应用程序需要20个TimelyQueue对象或一个TimelyQueue加上19个其他所有需要计时器的东西。最后我还是有20条线。或者,假设我正在构建一个socket服务器或GUI应用程序(您的TimelyQueue的两个最明显的用例;我可以在事件循环的顶部实现一个计时器(或者,很可能只是使用框架附带的计时器),那么我为什么需要线程呢?在

解决这个问题的方法是提供一个钩子来供应任何定时器工厂:

def __init__(self, timerfactory = threading.Timer):
    self.timerfactory = timerfactory
    ...

现在,当您需要调整计时器时:

^{pr2}$

对于快速和肮脏的用例,这已经足够好了。但是,如果我使用^{},我可以使用TimelyQueue(twisted.reactor.callLater),现在队列的计时器经过twisted事件循环。或者,如果我有一个在别处使用的多定时器单线程实现,TimelyQueue(multiTimer.add),现在队列的计时器与我所有其他计时器在同一个线程上。在

如果你愿意的话,你可以提供一个比threading.Timer更好的默认值,但实际上,我认为大多数需要比threading.Timer更好的东西的人将能够为他们的特定应用提供比你提供的更好的东西。在

当然,并不是每个定时器实现都有与threading.Timer相同的API,尽管你会惊讶于其中有多少是这样的。但是,如果您有一个计时器要与TimelyQueue一起使用,但它的接口错误,那么编写适配器并不困难。例如,如果我正在构建一个PyQt4/PySide应用程序,^{}没有cancel方法,并且需要毫秒而不是秒,因此我必须执行如下操作:

class AdaptedQTimer(object):
    def __init__(self, timeout, callback):
        self.timer = QTimer.singleShot(timeout * 1000, callback)
    def cancel(self):
        self.timer.stop()

q = TimelyQueue(AdaptedQTimer)

或者,如果我想更直接地将队列集成到QObject中,我可以结束QObject.startTimer(),并让我的timerEvent(self)方法调用回调。在

一旦你考虑适配器,最后一个想法。我认为这不值得,但可能值得考虑。如果您的计时器使用时间戳而不是timedelta,并且有一个adjust方法而不是cancel,并且拥有自己的waketime,那么您的{}实现可能会更简单,也可能更高效。在put中,您得到了如下内容:

if self.timer is None:
    self.timer = self.timerfactory(when)
elif when < self.timer.waketime:
    self.timer.adjust(when)

当然,大多数计时器不提供这个接口。但是如果有人有一个,或者愿意设计一个,他们可以得到好处。对于其他人,您可以提供一个简单的适配器,将threading.Timer样式的计时器转换为您需要的类型,类似于:

def timerFactoryAdapter(threadingStyleTimerFactory):
    class TimerFactory(object):
        def __init__(self, timestamp, callback):
            self.timer = threadingStyleTimerFactory(timestamp - now(), callback)
            self.callback = callback
        def cancel(self):
            return self.timer.cancel()
        def adjust(self, timestamp):
            self.timer.cancel()
            self.timer = threadingStyleTimerFactory(timestamp - now(), self.callback)

作为记录,我已经实现了你所建议的使用定时器工厂。我使用上面的版本运行了一个小的基准测试,新版本使用了threading.Timer类:

  1. 第一次实施

    • 在默认分辨率下(5ms,即5ms窗口内的所有内容都一起触发),它可以达到88kput()/秒和69kget()/秒。

    • 当分辨率设置为0ms(无优化)时,它可以达到88kput()/秒和55kget()/秒。

  2. 第二次实施

    • 在默认分辨率(5ms)下,它可以达到88kput()/秒和65kget()/秒。

    • 当分辨率设置为0毫秒时,它可以达到88kput()/秒和62kget()/秒。

我承认我很惊讶第二个实现没有分辨率优化更快。现在调查已经太晚了。在

import collections
import heapq
import threading
import time

class TimelyQueue:
    """
    Implements a similar but stripped down interface of Queue which
    delivers items on time only.
    """

    def __init__(self, resolution=5, timerfactory=threading.Timer):
        """
        `resolution' is an optimization to avoid wasting CPU cycles when
        something is about to happen in less than X ms.
        """
        self.resolution = float(resolution) / 1000
        self.timerfactory = timerfactory
        self.queue = []
        self.triggered = collections.deque()
        self.putcond = threading.Condition()
        self.getcond = threading.Condition()
        # Optimization to avoid waking the thread uselessly.
        self.putwaketime = 0
        self.timer = None
        self.terminating = False

    def __arm(self):
        """
        Arm the next timer; putcond must be acquired!
        """
        curtime = time.time()
        when, item = self.queue[0]
        interval = when - curtime
        self.putwaketime = when
        self.timer = self.timerfactory(interval, self.__fire)
        self.timer.start()

    def __fire(self):
        with self.putcond:
            curtime = time.time()
            debug = 0
            while True:
                # Don't dequeue now, we are not sure to use it yet.
                try:
                    when, item = self.queue[0]
                except IndexError:
                    break
                if when > curtime + self.resolution:
                    break

                debug += 1
                self.triggered.append(heapq.heappop(self.queue))
            if len(self.triggered) > 0:
                with self.getcond:
                    self.getcond.notify(len(self.triggered))
            if self.terminating:
                return
            if len(self.queue) > 0:
                self.__arm()

    def put(self, when, item):
        """
        `when' is a Unix time from Epoch.
        """
        with self.putcond:
            heapq.heappush(self.queue, (when, item))
            if when >= self.putwaketime and self.putwaketime != 0:
                return
            # Arm next timer.
            if self.timer is not None:
                self.timer.cancel()
            self.__arm()

    def get(self, timeout=None):
        """
        Timely return the next object on the queue.
        """
        with self.getcond:
            if len(self.triggered) > 0:
                when, item = self.triggered.popleft()
                return item
            self.getcond.wait(timeout)
            try:
                when, item = self.triggered.popleft()
            except IndexError:
                return None
            return item

    def qsize(self):
        """
        Self explanatory.
        """
        with self.putcond:
            return len(self.queue)

    def terminate(self):
        """
        Request the embedded thread to terminate.
        """
        with self.putcond:
            self.terminating = True
            if self.timer is not None:
                self.timer.cancel()
            self.putcond.notifyAll()


if __name__ == "__main__":
    q = TimelyQueue(0)
    N = 100000
    t0 = time.time()
    for i in range(N):
        q.put(time.time() + 2, i)
    dt = time.time() - t0
    print "put done in %.3fs (%.2f put/sec)" % (dt, N / dt)
    t0 = time.time()
    i = 0
    while i < N:
        a = q.get(3)
        if i == 0:
            dt = time.time() - t0
            print "start get after %.3fs" % dt
            t0 = time.time()
        i += 1
    dt = time.time() - t0
    print "get done in %.3fs (%.2f get/sec)" % (dt, N / dt)
    q.terminate()
    # Give change to the thread to exit properly, otherwise we may get
    # a stray interpreter exception.
    time.sleep(0.1)

相关问题 更多 >