“可关闭”队列的数据类型,用于处理多个生产者和消费者的项目流

2024-06-16 09:53:54 发布

您现在位置:Python中文网/ 问答频道 /正文

是否有一种特定类型的队列是“可关闭的”,并且适用于存在多个生产者、消费者,并且数据来自一个流时(因此不知道它何时结束)?在

我找不到实现这种行为的队列,也找不到它的名称,但它似乎是生产者-消费者类型问题的完整类型。在

例如,理想情况下,我可以编写以下代码:(1)每个生产者都会在队列完成时通知队列,(2)消费者会盲目调用阻塞get(),以及(3)当所有消费者都完成了操作,并且队列为空时,所有生产者都将解除阻止并收到“完成”通知:

作为代码,应该是这样的:

def produce():
  for x in range(randint()):
    queue.put(x)
    sleep(randint())
  queue.close()  # called once for every producer

def consume():
  while True:
    try:
      print queue.get()
    except ClosedQueue:
      print 'done!'
      break

num_producers = randint()
queue = QueueTypeThatICantFigureOutANameFor(num_producers)
[Thread(target=produce).start() for _ in range(num_producers)]
[Thread(target=consume).start() for _ in range(random())

另外,我不是在寻找“毒丸”解决方案,即为每个消费者添加一个“完成”值——我不喜欢生产者不礼貌地需要知道有多少消费者。在


Tags: 代码in类型forget队列queuedef
1条回答
网友
1楼 · 发布于 2024-06-16 09:53:54

我称之为自锁队列。在

对于您的主要需求,将队列与条件变量检查结合起来,该检查在所有生产者都已腾空时优雅地锁定(关闭)队列:

class SelfLatchingQueue(LatchingQueue):
  ...
  def __init__(self, num_producers):
    ...

  def close(self):
    '''Called by a producer to indicate that it is done producing'''

    ... perhaps check that current thread is a known producer? ...

    with self.a_mutex:
      self._num_active_producers -= 1
      if self._num_active_producers <= 0:
        # Future put()s throw QueueLatched. get()s will empty the queue
        # and then throw QueueEmpty thereafter
        self.latch() # Guess what superclass implements this?

对于您的第二个需求(在最初的帖子中,完成的生产者显然会阻塞,直到所有消费者都完成),我可能会使用barrier或其他条件变量。当然,这可以在SelfLatchingQueue的子类中实现,但是在不知道代码库的情况下,我会将这种行为与自动锁存分开。在

相关问题 更多 >