orkan是一个管道并行化库,用python编写。

Orkan的Python项目详细描述


orkan是一个管道并行化库,用python编写。

利用一台机器的多核能力 python通常并不像它应该的那么容易。奥坎的目标是 提供一个简单的api来使用那些未充分使用的cpu 万一你的计算需要额外的马力。

代码回购:https://github.com/tobigue/Orkan

管道

管道是一系列计算,其中 一个计算是下一个的输入。奥坎允许管道 有限个元素的处理,还有 无限的元素流。不同的处理 管道中的模块可以并行化,也可以多个 每个模块的工人。

根据Storm的术语, 奥肯采用喷口和螺栓的概念。奥坎语:

pautts是将元素馈送到管道中的进程。 它们被定义为接受回调函数的函数 用于将元素传递到管道中。喷口示例 是通过http请求监听输入的函数,对 Internet,读取大文件并发送数据块以进一步 正在处理或只是将iterable的元素馈送到管道中:

big_numbers = [
    112272535095293,
    112582705942171,
    112272535095293,
    115280095190773,
    115797848077099,
    1099726899285419] * 5

def put_primes_spout(callback):
    for n in big_numbers:
        callback(n)

bolts是管道内的进程,它可以进一步 处理。它们被定义为接受来自 上一个处理步骤并传递(可能已修改)元素 到管道中的下一个模块(或结果列表),使用 回调函数:

import math

def is_prime_bolt(n, callback):
    """From http://docs.python.org/dev/library/concurrent.futures.html"""
    if n % 2 == 0:
        callback((n, False))
    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            callback((n, False))
    callback((n, True))

为了方便使用“正常”功能,还可以指定螺栓 不需要回调函数。在这种情况下,返回值 函数的值传递给管道中的下一个模块:

import math

def is_prime_bolt(n):
    """From http://docs.python.org/dev/library/concurrent.futures.html"""
    if n % 2 == 0:
        return n, False
    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return n, False
    return n, True

注意,喷嘴和螺栓将分开启动 python进程。也就是说,它们的输入和输出元素 要成为pickable,它们应该not interaction with non-threadsafe 主要过程中的元素。元素在 使用线程安全队列实现管道的不同模块。

使用量

这就是你如何设置和启动一个简单的管道使用喷口 以及上面定义的螺栓:

from orkan import Pipeline

pipeline = Pipeline([(put_primes_spout, 1)], [(is_prime_bolt, 2)])
result = list(pipeline.start())

通过传递喷嘴列表和 螺栓。列表中的每个元素 是要执行的函数的元组和工作线程数 为该函数生成。请注意,如果运行多个 函数的工作进程结果元素的顺序可能不对应 按照各个输入元素的顺序。如果你需要联系 result元素到input元素,您应该传递输入 沿管道的元素(例如,通过在每个螺栓中返回元组)。

默认情况下,流水线是用最大的N个并行进程启动的, 其中n是计算机中的CPU数。也就是说,在 上面的例子在一个双芯机器上,第一个喷嘴和一个螺栓 正在并行运行。一旦喷口完成另一个 螺栓工被产生。在四核机器上,三个工人都会 从头并行运行。

您可以通过将n_jobs的值传递给start()

# this example corresponds to non-parallel processing
pipeline = Pipeline([(put_primes_spout, 1)], [(is_prime_bolt, 1)])
result = list(pipeline.start(n_jobs=1))

注意,如果数据输入流是无限的,则需要 每个喷口/螺栓至少有一名工人,因为没有工人会 完成,这样就不会为下面的新工人腾出一个空位。 管道。我也是个好主意 无限长管道的最后一个螺栓,否则 可能会在某个时候耗尽内存。

在管道中使用之前,你应该测试你的喷嘴和螺栓, 因为错误消息并不总是传播回主进程。

示例

示例将使用以下简单的喷嘴和螺栓:

def s(callback):
    """Simple spout that puts some random numbers into the Pipeline."""
    for _ in range(10):
        n = int(random.random() * 1000000)
        callback(n)

def b1(n):
    """Simple bolt that doubles the passed element (via return)."""
    return n * 2

def b2(n, callback):
    """Simple bolt that halves the passed element (via callback)."""
    callback(n / 2)

def v(n, callback):
    """Simple bolt for an inifinte stream of incoming data, that
    prints the result at the end of the Pipeline and does not pass
    anything on."""
    print n

有限输入

非并行处理:

pipeline = Pipeline([(s, 1)], [(b1, 1), (b2, 1)])
results = list(pipeline.start(n_jobs=1))

"""
    s
    |
    b1
    |
    b2
    |
    result
"""

流水线模块的并行处理:

pipeline = Pipeline([(s, 1)], [(b1, 1), (b2, 1)])
results = list(pipeline.start(n_jobs=4))

    s----b1----b2
               |
               result

B1螺栓的平行工:

pipeline = Pipeline([(s, 1)], [(b1, 2), (b2, 1)])
results = list(pipeline.start(n_jobs=4))

"""
       .-b1-------.
    s--|          |--b2
       '-------b1-'   |
                      result
"""

工人多于流程(B2工人将等待喷嘴完成):

pipeline = Pipeline([(s, 2)], [(b1, 2), (b2, 2)])
results = list(pipeline.start(n_jobs=4))

"""
    s-------.  .-b1-------.
            |--|          |-+
          s-'  '-------b1-' |
  .-b2-------.              |
+-|          |--------------+
| '-------b2-'
|
result
"""

无限输入流

源源不断的输入数据正确完成:

def s2(callback):
    """Simple spout that produces an infinite stream of random numbers."""
    while 1:
        n = int(random.random() * 1000000)
        callback(n)

pipeline = Pipeline([(s2, 1)], [(b1, 1), (v, 1)])
results = list(pipeline.start(n_jobs=4))

"""
    s2---b1----v
"""

源源不断的输入数据做错了(v workers永远不会启动):

pipeline = Pipeline([(s, 2)], [(b1, 2), (v, 2)])
results = list(pipeline.start(n_jobs=4))

"""
    s2------.  .-b1-------.
            |--|          |---#!
         s2-'  '-------b1-'
"""

测试

测试需要有nose库(pip install nose)。 安装后,可以通过执行 在源目录之外:

nosetests --exe -v

已知问题

  • 不适用于Windows

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
java如何在Spring Boot 2.2.0中添加弹性搜索?   jakarta ee如何在没有java认证的情况下停止直接访问网页(自定义标记)   java Hibernate:使用executeUpdate()的批删除未清除一级缓存   java如何在Hibernate中插入外键定义为Long的实体?   带参数的java Mockito单元测试计算器方法   java如何从Rally Rest API读取集合属性   java如何对基于消息的处理执行集成测试?   带插入排序的java排序字符串数组标记,双链表   java为什么在基于注释的Spring app@Value默认值中解析为null?   java Apache Commons Http客户端注册特定于客户端的协议   如何使用java反转字符串中n个部分的n个字符   java Tomcat在本地主机上运行良好,但在部署时出现内部服务器错误   使用信号量的变量的java结果   Java编译/运行时类路径问题   java哪个提供商负责AES/CTR/NOP添加?   伪错误解码器中的java响应未获取Zalando问题自定义属性