维护先进先出顺序的多进程任务队列。
parallel-queue的Python项目详细描述
你用它做什么?
使我撰写本文的用例是记录包含许多可并行的大型批处理操作的完成情况。 在我们的例子中,索引大量文档。它是一个多线程系统 简化)类似于这样:
class BatchDone(int): pass # marker for completion of an indexing batch def producer(queue): while True: doc_count = 0 for document in get_document_batch(): queue.put(indexing_func, document) # callables get executed on the arguments you supply doc_count += 1 queue.put(BatchDone(doc_count)) # non-callables get passed through, but still in FIFO order def consumer(queue): while True: result_or_batch_done = queue.get() # either the return value of an indexing_func call, or the marker if isinstance(result_or_batch_done, BatchDone): log_progress(result_or_batch_done) queue = ParallelQueue() Thread(target=producer, args=(queue,)).start() Thread(target=consumer, args=(queue,)).start()
与原始解决方案相比的优点(并行执行每个批处理,但要等到所有批处理都完成之后 启动下一批)是指一个批中的单个慢速文档不会阻止下一批启动:它 只有当内部缓冲区填满时(默认情况下,这些缓冲区可以累积为 由于有可用的CPU,许多任务都是无序的)。
快速入门指南
制作队列对象:
from parallel_queue import ParallelQueue queue = ParallelQueue(daemon=True) # there are a bunch of optionals for internal queue and buffer sizes
将任务及其数据放入队列(这里不允许所有内容,因为可以传递的内容受到限制 到工作进程;有关详细信息,请参见下面的内容:
queue.put(task_function, "data", timeout=5) # NOT THREADSAFE! Only do this from one thread!
然后把它们取出来(通常在put()调用的不同线程上):
queue.get(timeout=1)
完成后清理队列(这也不是线程安全的:stop()应该只从 与put()调用相同的线程:
queue.stop() # send a "stop" signal through the pipes, but return immediately queue.join(timeout=2) # join all worker and consumer processes (i.e. wait for queue to clear)
如果您忘记这样做,并且您没有使用daemon=True来排队,如果它被垃圾收集,您可以 在stderr上看到这样的垃圾:
Exception AssertionError: AssertionError() in <Finalize object, dead> ignored
我不确定这有多严重,但有些事情显然不对,所以我建议你尽量避免。
最后,您可以确认关机已完成:
if queue.is_alive(): raise Exception("join() must have timed out!")
通过管道的限制
在幕后,put()发送给工作进程的可调用任务和参数 multiprocessing标准库包。最重要的是,这意味着它们都必须是可挑选的。 所有内置类型都是可选取的,但只能选取在模块顶层定义的类和函数。 这就意味着这样的结构是行不通的:
def make_me_a_task_function(first): def task_function(second): return ' '.join((first, second)) return task_function queue.put(make_me_a_task_function("curried"), "eggs") # fails
另外,令人恼火的是,在运行的解释器会话中手工定义的函数和类是not可选择的,所以如果 你必须使用内置或导入的函数。
还有一个你不太可能遇到的限制,但为了完整起见,我将其包括在内:各种限制 不允许通过酸洗在进程之间共享跨进程同步原语。
这两个问题的部分解决方案是使用略低的级别ParallelSingleTaskQueue,而不是 ParallelQueue。它在单个任务函数中进行烘焙(SO是不太灵活的),但在进程之间共享它。 继承而不是pickle(所以非顶级函数和多进程同步原语是可以的)。 请参阅使用PermutationTask类的测试,以获取此技术的示例(以及它在以下方面的缺点 可读性)。
算法
并行化的基本算法是:
=================================== Shared multiprocess-safe resources - (shared) Queue "worker_input" (fed from producer) - (shared) Queue "worker_output" (from worker to consumer) - (shared) Value "max_packet_id" (highest packet id consumer is willing to accept from any worker) Consumer private resources - next_packet_id (next packet id consumer wants to send on) - PriorityQueue "consumer_output_buffer" (buffer for packets that arrive at consumer out of order) Worker processes loop on: 1. packet = worker_input.get() 2. result = user_supplied_task(packet) 3. while max_packet_id.value < packet.id: take a short nap 4. worker_output.put(result-with-packet-id) Consumer process: 1. next_packet_id = 0 2. while True: 3. max_packet_id.value = next_packet_id + consumer_output_buffer.maxsize - 1 # e.g. suppose buffer fits 5; waiting for 0 we can safely accept 1,2,3,4 4. while consumer_output_buffer.peek().packet_id != next_packet_id: 5. consumer_output_buffer.add(worker_output.get()) 6. output(consumer_output_buffer.pop()) 7. next_packet_id += 1 ===================================
有一些复杂情况(请参见the code了解它们产生的原因):
- PriorityQueue没有peek()
- 工人们不是“小睡一会儿”,而是等待消费者的Event(无论何时进行设置)
- 不应允许无限制地增加数据包ID:它们使用模块化算法进行包装和比较
- 我们需要能够在要求时优雅地关闭所有设备
- 异常处理需要公开来自工作进程的堆栈跟踪
- 可靠的测试需要一点关于算法内部进度的信息
安装
在PYPI上:
$ pip install parallel-queue
为Hackery安装
ymmv,但我是这样做的(您需要virtualenv安装,以及pip安装nose来运行测试):
$ hg clone https://bitbucket.org/tikitu/parallel_queue $ cd parallel_queue $ virtualenv --no-site-packages .
运行测试:
$ bin/pip install nose $ bin/nosetests