一种基于协同程序的执行器实现

coroexecutor的Python项目详细描述


https://github.com/cjrh/coroexecutor/workflows/Python%20application/badge.svghttps://img.shields.io/badge/stdlib--only-yes-green.svghttps://coveralls.io/repos/github/cjrh/coroexecutor/badge.svg?branch=masterhttps://img.shields.io/pypi/pyversions/coroexecutor.svghttps://img.shields.io/github/tag/cjrh/coroexecutor.svghttps://img.shields.io/badge/install-pip%20install%20coroexecutor-ff69b4.svghttps://img.shields.io/pypi/v/coroexecutor.svghttps://img.shields.io/badge/calver-YYYY.MM.MINOR-22bfda.svg

ALPHA

coroexecutor公司

提供一个Executor接口,用于运行一组协同程序 在异步本机应用程序中一起使用。在

演示

importasynciofromcoroexecutorimportCoroutineExecutorasyncdeff(dt):awaitasyncio.sleep(dt)asyncdefmain():asyncwithCoroutineExecutor()asexe:t1=exe.submit(f,0.01)t2=exe.submit(f,0.05)assertt1.done()assertt2.done()asyncio.run(main())

讨论

Executor接口不能完全匹配,因为 此接口中的某些函数需要是async函数。但是我们 可以接近。在

Trio的一些想法被用作灵感:

  • CoroutineExecutor等待所有提交的作业完成。在
  • 如果任何作业引发异常,则所有其他未完成的作业都将取消 (它们内部产生了取消错误),以及 CoroutineExecutor重新引发相同的异常。在

示例

使用map

concurrent.futures.Executor接口还定义了map(),其中 返回迭代器。但是,我们使用 异步生成器。下面是一个测试的例子:

^{pr2}$

您可以看到如何使用async for在 调用map得到的结果。在

如果某个函数调用引发错误,则所有未完成的调用都将 取消,但您可能仍收到部分结果。这是 测试的另一个例子:

times=[0.01,0.02,0.1,0.2]results=[]asyncdeff(dt):awaitasyncio.sleep(dt)ifdt==0.1:raiseException('oh noes')returndtasyncdefmain():asyncwithCoroutineExecutor()asexe:asyncforrinexe.map(f,times):results.append(r)withpytest.raises(Exception):run(main())assertresults==[0.01,0.02]

批处理的前两个值很快完成,我将它们保存到 resultslist在外部作用域中。然后,其中一个工作失败了 一个例外。这会导致其他挂起的作业被取消(即。, 本例中的“0.2”实例,CoroutineExecutor实例 重新引发异常,在本例中,异常引发所有 调用run()函数本身的方法。然而, 请注意,我们仍然有来自成功作业的结果。在

超时

CoroutineExecutor也应用超时似乎很方便 它管理的一批作业。毕竟,它已经在管理工作, 因此,在触发超时时取消它们看起来很小 额外的工作。在

这是超时的外观(同样,从一个测试中获得):

tasks=[]asyncdeff(dt):awaitasyncio.sleep(dt)asyncdefmain():asyncwithCoroutineExecutor(timeout=0.05)asexe:t1=exe.submit(f,0.01)t2=exe.submit(f,5)tasks.extend([t1,t2])withpytest.raises(asyncio.TimeoutError):run(main())t1,t2=tasksassertt1.done()andnott1.cancelled()assertt2.done()andt2.cancelled()

在执行器内部,有一个快速的工作和一个缓慢的工作。超时将 在快的完成之后,但在慢的完成之前应用。 引发的TimeoutError将取消慢速作业,并将被提升 从executor,实际上一直到run()函数(in 这个例子)。在

嵌套

您不必总是在一个函数中向executor提交任务。 executor实例可以被传递,并且可以向其添加工作 从几个不同的地方。在

fromrandomimportrandomasyncdeff(dt):awaitasyncio.sleep(dt)asyncdefproducer1(executor:CoroutineExecutor):executor.submit(f,random())executor.submit(f,random())executor.submit(f,random())asyncdefproducer2(executor:CoroutineExecutor):executor.submit(f,random())executor.submit(f,random())executor.submit(f,random())asyncdefmain():asyncwithCoroutineExecutor(timeout=0.5)asexecutor:executor.submit(f,random())executor.submit(f,random())executor.submit(f,random())executor.submit(producer1,executor)executor.submit(producer2,executor)run(main())

您不仅可以在executor上下文管理器中提交作业,还可以 传递实例并从其他函数收集作业。以及 创建CoroutineExecutor实例时设置的超时仍将 被应用。在

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
使用jaxrpc的Java eclipse WebService客户端   java编程方式在对象上写入名称   java Spring批处理:重试后跳过   java Android错误:错误:任务执行失败:应用程序:transformClassesWithDexForDebug'   带有清单文件nullPointerException的java Android元数据   spring Java Quartz调度作业停止运行   JavaMockito:如何在不调用实际方法的情况下,模拟带有参数和无效返回类型的静态方法?   java Tomcat连接池问题无法在关闭的连接上调用方法   java如何交换列表中的项目?   java如何停止线程并通过Toast在线程中正确显示文本?   java为什么连续写入OutputStream时偏移量0不会导致重复字节?   java我无法生成头文件   不兼容的返回类型错误java   修改值后键值对的java Jolt转换规范   java有自动更新Javadoc的工具吗?   java线程如何在ints自身实例类中共享变量   java继承一个非gwt模块   java Hibernate xml配置   使用netty4异步调用的java链接HTTP请求响应