Python中的Queue与JoinableQueue

2024-04-26 15:02:02 发布

您现在位置:Python中文网/ 问答频道 /正文

在Python中,当使用多处理模块时,有两种队列:

  • 排队
  • 可加入队列。

他们之间有什么区别?

排队

from multiprocessing import Queue
q = Queue()
q.put(item) # Put an item on the queue
item = q.get() # Get an item from the queue

可加入队列

from multiprocessing import JoinableQueue
q = JoinableQueue()
q.task_done() # Signal task completion
q.join() # Wait for completion

Tags: 模块thefromimportantask队列queue
2条回答

^{}有方法join()task_done(),而^{}没有


class multiprocessing.Queue( [maxsize] )

Returns a process shared queue implemented using a pipe and a few locks/semaphores. When a process first puts an item on the queue a feeder thread is started which transfers objects from a buffer into the pipe.

The usual Queue.Empty and Queue.Full exceptions from the standard library’s Queue module are raised to signal timeouts.

Queue implements all the methods of Queue.Queue except for task_done() and join().


class multiprocessing.JoinableQueue( [maxsize] )

JoinableQueue, a Queue subclass, is a queue which additionally has task_done() and join() methods.

task_done()

Indicate that a formerly enqueued task is complete. Used by queue consumer threads. For each get() used to fetch a task, a subsequent call to task_done() tells the queue that the processing on the task is complete.

If a join() is currently blocking, it will resume when all items have been processed (meaning that a task_done() call was received for every item that had been put() into the queue).

Raises a ValueError if called more times than there were items placed in the queue.

join()

Block until all items in the queue have been gotten and processed.

The count of unfinished tasks goes up whenever an item is added to the queue. The count goes down whenever a consumer thread calls task_done() to indicate that the item was retrieved and all work on it is complete. When the count of unfinished tasks drops to zero, join() unblocks.


如果使用JoinableQueue,则必须为从队列中删除的每个任务调用JoinableQueue.task_done(),否则用于计算未完成任务数的信号量最终可能溢出,引发异常。

根据文档,很难确定Queue实际上是空的。使用JoinableQueue可以通过调用q.join()等待队列清空。如果您希望在不同的批次中完成工作,并且在每个批次的末尾执行一些离散的操作,这可能会有帮助。

例如,您可能通过队列一次处理1000个项目,然后向用户发送一个推送通知,告知您已经完成了另一个批处理。这对于使用普通的Queue实现是一个挑战。

可能看起来像:

import multiprocessing as mp

BATCH_SIZE = 1000
STOP_VALUE = 'STOP'

def consume(q):
  for item in iter(q.get, STOP_VALUE):
    try:
      process(item)
    # Be very defensive about errors since they can corrupt pipes.
    except Exception as e:
      logger.error(e)
    finally:
      q.task_done()

q = mp.JoinableQueue()
with mp.Pool() as pool:
  # Pull items off queue as fast as we can whenever they're ready.
  for _ in range(mp.cpu_count()):
    pool.apply_async(consume, q)
  for i in range(0, len(URLS), BATCH_SIZE):
    # Put `BATCH_SIZE` items in queue asynchronously.
    pool.map_async(expensive_func, URLS[i:i+BATCH_SIZE], callback=q.put)
    # Wait for the queue to empty.
    q.join()
    notify_users()
  # Stop the consumers so we can exit cleanly.
  for _ in range(mp.cpu_count()):
    q.put(STOP_VALUE)

注:我还没有真正运行这个代码。如果您从队列中提取项目的速度比放置项目的速度快,则可能会提前完成。在这种情况下,这段代码至少每1000条发送一次更新,而且可能更频繁。对于进度更新,这可能是可以的。如果精确到1000很重要,那么可以使用mp.Value('i', 0),并在join发布时检查它是否为1000。

相关问题 更多 >