VimAP
vimap的Python项目详细描述
IMAP上的变化,而不是C
^ {TT1}$包被设计成提供更灵活的替代方案。 对于multiprocessing.imap_unordered。(你应该读 multiprocessingdocumentation如果您还没有,或者 自述文件没有意义!)它渴望支持类似http的客户端 处理数据,但不包含特定于客户端的内容。
特别是什么使它更灵活?- 它有助于进程的自定义初始化(可以连接 到http客户端等),并进行清理。
- 它有助于将配置传递给进程,通常 多处理,一个结束[浪费]元组配置 每个输入的值。
- 它有助于传递不可序列化的对象(在进程 分叉)
- 允许超时和重试失败的任务或进程
我们希望做什么比常规的^{tt3}更好$ 图书馆?
- 您不必通过黑客来实现自定义进程初始化[ 到岸价http://stackoverflow.com/a/10118250/81636]
- api有助于防止愚蠢的错误,比如在 你已经定义了相关的函数。[ http://stackoverflow.com/q/2782961/81636]
- 旨在获得更好的工作异常处理-多处理将 把死掉的工人进程留在周围;我们的目标不是。
- 常见用例的集合(从文件中读取等)
其他功能/设计决定:
- 通过预先将任务排队,试图让所有工作人员保持忙碌状态
- 而且,它可能不会影响您,但是这个库也可以 解决Python2.6.7中的一个错误[ http://stackoverflow.com/q/16684900/81636。
定义工作函数
vimap通过修饰提供其自定义初始化等 功能。如果您的输入是http请求,并且您希望 从一组服务器中的任何一个响应,您可以表达您的程序 因此(使用requestshttp库–它很直观,所以 可能不需要阅读它的文档),
import vimap.worker_process @vimap.worker_process.worker def send_reqests_worker(requests, server): s = requests.Session() for request in requests: yield s.post('http://{0}{1}'.format(server, request.uri), data=request.data) s.close()
怎么回事?当工作进程启动时,新会话是 开的。每个请求(包含.uri和 .data)由父进程发送,并发送到服务器。那么, worker产生一个single响应,该响应被发送回 父进程。
从父进程映射数据
让我们继续这个例子,
import vimap.pool pool = vimap.pool.fork(send_requests_worker.init_args(server=server) for server in my_servers)
这将初始化一个工人池。每个都有一个绑定参数 server。当工作进程启动时,它们开始运行,直到 他们试图从requestsiterator中提取一个元素;然后 必须暂停父进程才能发送数据。父进程可以 像这样发送数据,
Request = namedtuple("Request", ["uri", "data"]) pool.imap(Request(**ujson.loads(line)) for line in fileinput.input()).block_ignore_output()
它从包含json输入的文件中读取行,并发送 工人的条目。在现实世界中,你可能想 工人执行json加载。.block_ignore_output()将 使整个iterable(输入文件)被读取,并且[默认情况下] 做完后把游泳池关上。
imap
输入活页夹
imap元组的第一个变量是带输出的输入。所以,你已经 一些[惰性]输入,
x1, x2, x3, x4, x5, ...
当你用一些函数f来vimap这个函数时,你会得到 元组,
(x1, f(x1)), (x2, f(x2)), ...
可能不合适。既然是流媒体,就不需要 长时间围绕输入–输入活页夹将保持在o左右(# 进程)输入,因此可以安全地遍历大型输入。在 代码,这可能看起来像,
for input, output in pool.imap(iterable).zip_in_out(): results[input] = output # do some more processing
处理异常
如果您想优雅地处理冒泡到main的异常 工作进程的函数,您可以请求vimap返回 它从工人那里得到的任何例外。
for input, output, typ in pool.imap(iterable).zip_in_out_typ(): if typ == 'exception': print('Worker had an exception:') print(output.formatted_traceback) elif typ == 'output': print('I got some actual output from a worker!') print(output)
^如果返回类型为 是exception;其中包含引发的异常的value,并且 formatted_tracebackstring的回溯 打印到stderr。