多工作管道和可关闭队列

gevent-pipeline的Python项目详细描述


带gevent的多工管道。

安装

pip install gevent-pipeline

示例

importgeventimportrandomfromitertoolsimportrepeatfromgevent_pipelineimportPipelinedefsample(b):r=random.uniform(0,b)gevent.sleep(r)returnr(Pipeline().from_iter(repeat(1,times=200)).map(sample,n_workers=100).filter(lambdax:x<0.5).fold(max,x0=0,n_workers=50))

管道

将每个层的操作与多个工作人员链接在一起。

示例:

>>>importoperator>>>defonly_odd(x):...returnx&1...>>>defdouble(x):...return2*x...>>>(Pipeline()....from_iter(range(100))....filter(only_odd,n_workers=10)....map(double,n_workers=8)....fold(operator.add,x0=0,n_workers=5))5000

上述功能等同于:

>>>@worker(discard_none=True)...defonly_odd(x):...'''Forward only odd numbers to the next layer'''...ifx&1:...returnx...>>>@worker()...defdouble(x):...return2*x...>>>defload_numbers(q_in,q_out,q_done):...foriinrange(100):...q_out.put(i)...q_done.put(None)...>>>q_out=ClosableQueue()>>>p=(Pipeline()....chain_workers(load_numbers)....chain_workers(only_odd,n_workers=10)....chain_workers(double,n_workers=8,q_out=q_out))>>>sum(iforiinq_out)5000

没有订单保证:

>>>deff(x):...gevent.sleep(random.uniform(0,0.001))...returnx>>>p=Pipeline().from_iter(range(10)).map(f,n_workers=5)>>>list(p)[2,1,4,0,3,5,8,6,7,9]

工人中的异常

事件中有一个预定义的forward_input异常处理程序 函数引发异常处理程序将输入带到 函数并将其作为输出传递。

fromgevent_pipelineimportPipeline,worker,forward_input@worker(exception_handler=forward_input)deff(x):ifx&1:raiseValueError("oh no!")# Will be treated as if it were:# return xelse:return2*xp=(Pipeline().from_iter(range(100)).chain_workers(f,n_workers=10))s_odd=sum(range(1,100,2))s_even=sum(2*iforiinrange(0,100,2))assertsum(p)==s_odd+s_even

closablequeue

行为类似于gevent.queue.Queue,但另外还有一个.close() 调用以下行为的方法:

  • 调用.put(item)将成为一个错误
  • 连续调用.get()将返回 队列,在该停止迭代之后,将为每个后续的 呼叫
>>>fromgevent_pipelineimportClosableQueue>>>q=ClosableQueue()>>>q.put('hello')>>>q.close()>>>q.get()'hello'>>>q.get()isStopIterationTrue

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
相对于框架java窗口的鼠标位置错误   Java 8流peek api   java将数据附加到文件中   java使用ExoPlayer 2.8播放播放列表中的特定文件   JavaSpring国际化:如何动态设置语言环境值   java如何在mysql中实现两个表之间的两个关联   java在gradle可执行jar文件中包含运行时参数   surefire插件中的java maven多套测试套件   java试图理解堆分析以确定内存泄漏或所需的大量内存   java识别字符串有数字   数组如何解决错误“java.lang.ArrayIndexOutOfBoundsException:5”   java Swt文件对话框选择的文件太多?   java此登录代码易受SQL注入攻击吗?   Java[3]中的文件<identifier>预期编译错误   java如何在spring webflux中发送列表   jar中未找到java文件异常   如何在java中合并2D数组?   java如何评测本机JNI库