orkan是一个管道并行化库,用python编写。
Orkan的Python项目详细描述
orkan是一个管道并行化库,用python编写。
利用一台机器的多核能力 python通常并不像它应该的那么容易。奥坎的目标是 提供一个简单的api来使用那些未充分使用的cpu 万一你的计算需要额外的马力。
代码回购:https://github.com/tobigue/Orkan
管道
管道是一系列计算,其中 一个计算是下一个的输入。奥坎允许管道 有限个元素的处理,还有 无限的元素流。不同的处理 管道中的模块可以并行化,也可以多个 每个模块的工人。
根据Storm的术语, 奥肯采用喷口和螺栓的概念。奥坎语:
pautts是将元素馈送到管道中的进程。 它们被定义为接受回调函数的函数 用于将元素传递到管道中。喷口示例 是通过http请求监听输入的函数,对 Internet,读取大文件并发送数据块以进一步 正在处理或只是将iterable的元素馈送到管道中:
big_numbers = [ 112272535095293, 112582705942171, 112272535095293, 115280095190773, 115797848077099, 1099726899285419] * 5 def put_primes_spout(callback): for n in big_numbers: callback(n)
bolts是管道内的进程,它可以进一步 处理。它们被定义为接受来自 上一个处理步骤并传递(可能已修改)元素 到管道中的下一个模块(或结果列表),使用 回调函数:
import math def is_prime_bolt(n, callback): """From http://docs.python.org/dev/library/concurrent.futures.html""" if n % 2 == 0: callback((n, False)) sqrt_n = int(math.floor(math.sqrt(n))) for i in range(3, sqrt_n + 1, 2): if n % i == 0: callback((n, False)) callback((n, True))
为了方便使用“正常”功能,还可以指定螺栓 不需要回调函数。在这种情况下,返回值 函数的值传递给管道中的下一个模块:
import math def is_prime_bolt(n): """From http://docs.python.org/dev/library/concurrent.futures.html""" if n % 2 == 0: return n, False sqrt_n = int(math.floor(math.sqrt(n))) for i in range(3, sqrt_n + 1, 2): if n % i == 0: return n, False return n, True
注意,喷嘴和螺栓将分开启动 python进程。也就是说,它们的输入和输出元素 要成为pickable,它们应该not interaction with non-threadsafe 主要过程中的元素。元素在 使用线程安全队列实现管道的不同模块。
使用量
这就是你如何设置和启动一个简单的管道使用喷口 以及上面定义的螺栓:
from orkan import Pipeline pipeline = Pipeline([(put_primes_spout, 1)], [(is_prime_bolt, 2)]) result = list(pipeline.start())
通过传递喷嘴列表和 螺栓。列表中的每个元素 是要执行的函数的元组和工作线程数 为该函数生成。请注意,如果运行多个 函数的工作进程结果元素的顺序可能不对应 按照各个输入元素的顺序。如果你需要联系 result元素到input元素,您应该传递输入 沿管道的元素(例如,通过在每个螺栓中返回元组)。
默认情况下,流水线是用最大的N个并行进程启动的, 其中n是计算机中的CPU数。也就是说,在 上面的例子在一个双芯机器上,第一个喷嘴和一个螺栓 正在并行运行。一旦喷口完成另一个 螺栓工被产生。在四核机器上,三个工人都会 从头并行运行。
您可以通过将n_jobs的值传递给start():
# this example corresponds to non-parallel processing pipeline = Pipeline([(put_primes_spout, 1)], [(is_prime_bolt, 1)]) result = list(pipeline.start(n_jobs=1))
注意,如果数据输入流是无限的,则需要 每个喷口/螺栓至少有一名工人,因为没有工人会 完成,这样就不会为下面的新工人腾出一个空位。 管道。我也是个好主意 无限长管道的最后一个螺栓,否则 可能会在某个时候耗尽内存。
在管道中使用之前,你应该测试你的喷嘴和螺栓, 因为错误消息并不总是传播回主进程。
示例
示例将使用以下简单的喷嘴和螺栓:
def s(callback): """Simple spout that puts some random numbers into the Pipeline.""" for _ in range(10): n = int(random.random() * 1000000) callback(n) def b1(n): """Simple bolt that doubles the passed element (via return).""" return n * 2 def b2(n, callback): """Simple bolt that halves the passed element (via callback).""" callback(n / 2) def v(n, callback): """Simple bolt for an inifinte stream of incoming data, that prints the result at the end of the Pipeline and does not pass anything on.""" print n
有限输入
非并行处理:
pipeline = Pipeline([(s, 1)], [(b1, 1), (b2, 1)]) results = list(pipeline.start(n_jobs=1)) """ s | b1 | b2 | result """
流水线模块的并行处理:
pipeline = Pipeline([(s, 1)], [(b1, 1), (b2, 1)]) results = list(pipeline.start(n_jobs=4)) s----b1----b2 | result
B1螺栓的平行工:
pipeline = Pipeline([(s, 1)], [(b1, 2), (b2, 1)]) results = list(pipeline.start(n_jobs=4)) """ .-b1-------. s--| |--b2 '-------b1-' | result """
工人多于流程(B2工人将等待喷嘴完成):
pipeline = Pipeline([(s, 2)], [(b1, 2), (b2, 2)]) results = list(pipeline.start(n_jobs=4)) """ s-------. .-b1-------. |--| |-+ s-' '-------b1-' | .-b2-------. | +-| |--------------+ | '-------b2-' | result """
无限输入流
源源不断的输入数据正确完成:
def s2(callback): """Simple spout that produces an infinite stream of random numbers.""" while 1: n = int(random.random() * 1000000) callback(n) pipeline = Pipeline([(s2, 1)], [(b1, 1), (v, 1)]) results = list(pipeline.start(n_jobs=4)) """ s2---b1----v """
源源不断的输入数据做错了(v workers永远不会启动):
pipeline = Pipeline([(s, 2)], [(b1, 2), (v, 2)]) results = list(pipeline.start(n_jobs=4)) """ s2------. .-b1-------. |--| |---#! s2-' '-------b1-' """
测试
测试需要有nose库(pip install nose)。 安装后,可以通过执行 在源目录之外:
nosetests --exe -v
已知问题
- 不适用于Windows