维护先进先出顺序的多进程任务队列。

parallel-queue的Python项目详细描述


< P>一个数据处理管道,保证任务以相同的顺序退出流水线,但 在中间使用多处理并行化它们。您将可调用项及其参数放入 put(task, *args, **kwargs)。对get()的每次调用都将按 任务已添加,但task调用本身在多个进程中并发运行。

你用它做什么?

使我撰写本文的用例是记录包含许多可并行的大型批处理操作的完成情况。 在我们的例子中,索引大量文档。它是一个多线程系统 简化)类似于这样:

class BatchDone(int): pass  # marker for completion of an indexing batch

def producer(queue):
    while True:
        doc_count = 0
        for document in get_document_batch():
            queue.put(indexing_func, document)  # callables get executed on the arguments you supply
            doc_count += 1
        queue.put(BatchDone(doc_count))  # non-callables get passed through, but still in FIFO order

def consumer(queue):
    while True:
        result_or_batch_done = queue.get()  # either the return value of an indexing_func call, or the marker
        if isinstance(result_or_batch_done, BatchDone):
            log_progress(result_or_batch_done)

queue = ParallelQueue()

Thread(target=producer, args=(queue,)).start()
Thread(target=consumer, args=(queue,)).start()

与原始解决方案相比的优点(并行执行每个批处理,但要等到所有批处理都完成之后 启动下一批)是指一个批中的单个慢速文档不会阻止下一批启动:它 只有当内部缓冲区填满时(默认情况下,这些缓冲区可以累积为 由于有可用的CPU,许多任务都是无序的)。

快速入门指南

制作队列对象:

from parallel_queue import ParallelQueue
queue = ParallelQueue(daemon=True) # there are a bunch of optionals for internal queue and buffer sizes

将任务及其数据放入队列(这里不允许所有内容,因为可以传递的内容受到限制 到工作进程;有关详细信息,请参见下面的内容:

queue.put(task_function, "data", timeout=5)  # NOT THREADSAFE! Only do this from one thread!

然后把它们取出来(通常在put()调用的不同线程上):

queue.get(timeout=1)

完成后清理队列(这也不是线程安全的:stop()应该只从 与put()调用相同的线程:

queue.stop()  # send a "stop" signal through the pipes, but return immediately
queue.join(timeout=2)  # join all worker and consumer processes (i.e. wait for queue to clear)

如果您忘记这样做,并且您没有使用daemon=True来排队,如果它被垃圾收集,您可以 在stderr上看到这样的垃圾:

Exception AssertionError: AssertionError() in <Finalize object, dead> ignored

我不确定这有多严重,但有些事情显然不对,所以我建议你尽量避免。

最后,您可以确认关机已完成:

if queue.is_alive():
    raise Exception("join() must have timed out!")

通过管道的限制

在幕后,put()发送给工作进程的可调用任务和参数 multiprocessing标准库包。最重要的是,这意味着它们都必须是可挑选的。 所有内置类型都是可选取的,但只能选取在模块顶层定义的类和函数。 这就意味着这样的结构是行不通的:

def make_me_a_task_function(first):
    def task_function(second):
        return ' '.join((first, second))
    return task_function

queue.put(make_me_a_task_function("curried"), "eggs")  # fails

另外,令人恼火的是,在运行的解释器会话中手工定义的函数和类是not可选择的,所以如果 你必须使用内置或导入的函数。

还有一个你不太可能遇到的限制,但为了完整起见,我将其包括在内:各种限制 不允许通过酸洗在进程之间共享跨进程同步原语。

这两个问题的部分解决方案是使用略低的级别ParallelSingleTaskQueue,而不是 ParallelQueue。它在单个任务函数中进行烘焙(SO是不太灵活的),但在进程之间共享它。 继承而不是pickle(所以非顶级函数和多进程同步原语是可以的)。 请参阅使用PermutationTask类的测试,以获取此技术的示例(以及它在以下方面的缺点 可读性)。

算法

并行化的基本算法是:

===================================
Shared multiprocess-safe resources
- (shared) Queue "worker_input" (fed from producer)
- (shared) Queue "worker_output" (from worker to consumer)
- (shared) Value "max_packet_id" (highest packet id consumer is willing to accept from any worker)

Consumer private resources
  - next_packet_id (next packet id consumer wants to send on)
  - PriorityQueue "consumer_output_buffer" (buffer for packets that arrive at consumer out of order)

Worker processes loop on:
1. packet = worker_input.get()
2. result = user_supplied_task(packet)
3. while max_packet_id.value < packet.id: take a short nap
4. worker_output.put(result-with-packet-id)

Consumer process:
1. next_packet_id = 0
2. while True:
3.     max_packet_id.value = next_packet_id + consumer_output_buffer.maxsize - 1
       # e.g. suppose buffer fits 5; waiting for 0 we can safely accept 1,2,3,4
4.     while consumer_output_buffer.peek().packet_id != next_packet_id:
5.         consumer_output_buffer.add(worker_output.get())
6.     output(consumer_output_buffer.pop())
7.     next_packet_id += 1
===================================

有一些复杂情况(请参见the code了解它们产生的原因):

  • PriorityQueue没有peek()
  • 工人们不是“小睡一会儿”,而是等待消费者的Event(无论何时进行设置)
  • 不应允许无限制地增加数据包ID:它们使用模块化算法进行包装和比较
  • 我们需要能够在要求时优雅地关闭所有设备
  • 异常处理需要公开来自工作进程的堆栈跟踪
  • 可靠的测试需要一点关于算法内部进度的信息

安装

在PYPI上:

$ pip install parallel-queue

为Hackery安装

ymmv,但我是这样做的(您需要virtualenv安装,以及pip安装nose来运行测试):

$ hg clone https://bitbucket.org/tikitu/parallel_queue
$ cd parallel_queue
$ virtualenv --no-site-packages .

运行测试:

$ bin/pip install nose
$ bin/nosetests

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
java FileHandle在libgdx中的行为异常   java JSONObject文本必须在1[字符2第1行]处以“{”开头,在使用HTTPClient自动化API时出现此错误   java如何删除域下不同路径的所有cookie   项目间的java Log4j共享   java propertyChangeListeners连锁反应,导致溢出   java gradle测试错误:retrolambda。oldJdk   java IDE没有给出错误,但ArrayList无法工作   web服务Java大字符串压缩安全方法   java如何从奥地利ecard将ResponseADU解码为XML?   java RxJava 2将事件并行化以执行,并产生副作用   java在jni的CallStaticObjectMethod的引用上使用DeleteLocalRef   java递归查找字符串中出现的字母数   java为什么SBT想要获得组织。scalasbt是否已安装?   java如何动态地增加布局,并知道用户点击了哪个布局?   条形图上未设置java截击响应数据