多工作管道和可关闭队列
gevent-pipeline的Python项目详细描述
带gevent的多工管道。
安装
pip install gevent-pipeline
示例
importgeventimportrandomfromitertoolsimportrepeatfromgevent_pipelineimportPipelinedefsample(b):r=random.uniform(0,b)gevent.sleep(r)returnr(Pipeline().from_iter(repeat(1,times=200)).map(sample,n_workers=100).filter(lambdax:x<0.5).fold(max,x0=0,n_workers=50))
管道
将每个层的操作与多个工作人员链接在一起。
示例:
>>>importoperator>>>defonly_odd(x):...returnx&1...>>>defdouble(x):...return2*x...>>>(Pipeline()....from_iter(range(100))....filter(only_odd,n_workers=10)....map(double,n_workers=8)....fold(operator.add,x0=0,n_workers=5))5000
上述功能等同于:
>>>@worker(discard_none=True)...defonly_odd(x):...'''Forward only odd numbers to the next layer'''...ifx&1:...returnx...>>>@worker()...defdouble(x):...return2*x...>>>defload_numbers(q_in,q_out,q_done):...foriinrange(100):...q_out.put(i)...q_done.put(None)...>>>q_out=ClosableQueue()>>>p=(Pipeline()....chain_workers(load_numbers)....chain_workers(only_odd,n_workers=10)....chain_workers(double,n_workers=8,q_out=q_out))>>>sum(iforiinq_out)5000
没有订单保证:
>>>deff(x):...gevent.sleep(random.uniform(0,0.001))...returnx>>>p=Pipeline().from_iter(range(10)).map(f,n_workers=5)>>>list(p)[2,1,4,0,3,5,8,6,7,9]
工人中的异常
事件中有一个预定义的forward_input异常处理程序 函数引发异常处理程序将输入带到 函数并将其作为输出传递。
fromgevent_pipelineimportPipeline,worker,forward_input@worker(exception_handler=forward_input)deff(x):ifx&1:raiseValueError("oh no!")# Will be treated as if it were:# return xelse:return2*xp=(Pipeline().from_iter(range(100)).chain_workers(f,n_workers=10))s_odd=sum(range(1,100,2))s_even=sum(2*iforiinrange(0,100,2))assertsum(p)==s_odd+s_even
closablequeue
行为类似于gevent.queue.Queue,但另外还有一个.close() 调用以下行为的方法:
- 调用.put(item)将成为一个错误
- 连续调用.get()将返回 队列,在该停止迭代之后,将为每个后续的 呼叫
>>>fromgevent_pipelineimportClosableQueue>>>q=ClosableQueue()>>>q.put('hello')>>>q.close()>>>q.get()'hello'>>>q.get()isStopIterationTrue