Asyncio Python的节流器
aiothrottler的Python项目详细描述
所有转子
异步python的节流器
安装
pip install aiothrottler
用法
创建共享的Throttler
,传递最小间隔,例如0.5
秒
fromaiothrottlerimportThrottlerthrottler=Throttler(min_interval=0.5)
然后在要限制的代码片段之前,callthis和await
它的结果。
awaitthrottler()# There will be a gap of at least 0.5 seconds# between executions reaching this line
示例:多个任务被限制
importasyncioimporttimefromaiothrottlerimportThrottlerasyncdefmain():throttler=Throttler(min_interval=0.5)awaitasyncio.gather(*[worker(throttler)for_inrange(10)])asyncdefworker(throttler):awaitthrottler()# Interval of at least 0.5 seconds between prints# even though all workers started togetherprint(time.time())loop=asyncio.get_event_loop()loop.run_until_complete(main())loop.close()
示例:单任务限制/平滑
importasyncioimportrandomimporttimefromaiothrottlerimportThrottlerasyncdefmain():throttler=Throttler(min_interval=0.5)for_inrange(10):awaitthrottler()# Interval of at least 0.5 seconds between prints# even though each sleep is randomprint(time.time())awaitasyncio.sleep(random.random())loop=asyncio.get_event_loop()loop.run_until_complete(main())loop.close()
与备选方案的差异
api具有一个函数来调用
await
其结果[有些使用上下文管理器]api是必需的[有些使用函数方法/高阶函数]
不使用轮询[有些使用内部轮询]
使用minimum interval between resolutions来限制[而不是每个时间间隔的max resolutions,这可能导致不规则的分辨率模式]
这些测试包括边缘情况,例如在任务被限制后断言限制已被取消[某些替代方案没有]