一种基于协同程序的执行器实现
coroexecutor的Python项目详细描述
ALPHA
coroexecutor公司
提供一个Executor接口,用于运行一组协同程序 在异步本机应用程序中一起使用。在
演示
importasynciofromcoroexecutorimportCoroutineExecutorasyncdeff(dt):awaitasyncio.sleep(dt)asyncdefmain():asyncwithCoroutineExecutor()asexe:t1=exe.submit(f,0.01)t2=exe.submit(f,0.05)assertt1.done()assertt2.done()asyncio.run(main())
讨论
Executor接口不能完全匹配,因为 此接口中的某些函数需要是async函数。但是我们 可以接近。在
Trio的一些想法被用作灵感:
- CoroutineExecutor等待所有提交的作业完成。在
- 如果任何作业引发异常,则所有其他未完成的作业都将取消 (它们内部产生了取消错误),以及 CoroutineExecutor重新引发相同的异常。在
示例
使用map
concurrent.futures.Executor接口还定义了map(),其中 返回迭代器。但是,我们使用 异步生成器。下面是一个测试的例子:
^{pr2}$您可以看到如何使用async for在 调用map得到的结果。在
如果某个函数调用引发错误,则所有未完成的调用都将 取消,但您可能仍收到部分结果。这是 测试的另一个例子:
times=[0.01,0.02,0.1,0.2]results=[]asyncdeff(dt):awaitasyncio.sleep(dt)ifdt==0.1:raiseException('oh noes')returndtasyncdefmain():asyncwithCoroutineExecutor()asexe:asyncforrinexe.map(f,times):results.append(r)withpytest.raises(Exception):run(main())assertresults==[0.01,0.02]
批处理的前两个值很快完成,我将它们保存到 resultslist在外部作用域中。然后,其中一个工作失败了 一个例外。这会导致其他挂起的作业被取消(即。, 本例中的“0.2”实例,CoroutineExecutor实例 重新引发异常,在本例中,异常引发所有 调用run()函数本身的方法。然而, 请注意,我们仍然有来自成功作业的结果。在
超时
让CoroutineExecutor也应用超时似乎很方便 它管理的一批作业。毕竟,它已经在管理工作, 因此,在触发超时时取消它们看起来很小 额外的工作。在
这是超时的外观(同样,从一个测试中获得):
tasks=[]asyncdeff(dt):awaitasyncio.sleep(dt)asyncdefmain():asyncwithCoroutineExecutor(timeout=0.05)asexe:t1=exe.submit(f,0.01)t2=exe.submit(f,5)tasks.extend([t1,t2])withpytest.raises(asyncio.TimeoutError):run(main())t1,t2=tasksassertt1.done()andnott1.cancelled()assertt2.done()andt2.cancelled()
在执行器内部,有一个快速的工作和一个缓慢的工作。超时将 在快的完成之后,但在慢的完成之前应用。 引发的TimeoutError将取消慢速作业,并将被提升 从executor,实际上一直到run()函数(in 这个例子)。在
嵌套
您不必总是在一个函数中向executor提交任务。 executor实例可以被传递,并且可以向其添加工作 从几个不同的地方。在
fromrandomimportrandomasyncdeff(dt):awaitasyncio.sleep(dt)asyncdefproducer1(executor:CoroutineExecutor):executor.submit(f,random())executor.submit(f,random())executor.submit(f,random())asyncdefproducer2(executor:CoroutineExecutor):executor.submit(f,random())executor.submit(f,random())executor.submit(f,random())asyncdefmain():asyncwithCoroutineExecutor(timeout=0.5)asexecutor:executor.submit(f,random())executor.submit(f,random())executor.submit(f,random())executor.submit(producer1,executor)executor.submit(producer2,executor)run(main())
您不仅可以在executor上下文管理器中提交作业,还可以 传递实例并从其他函数收集作业。以及 创建CoroutineExecutor实例时设置的超时仍将 被应用。在
- 项目
标签: