通过异步队列管理事件的简单库
aioevents-ng的Python项目详细描述
通过异步队列管理事件的简单库
安装
pip install aioevents-ng
注意:对于python 3.6,您需要安装dataclasses
pip install dataclasses
用法示例
importasynciofromdataclassesimportdataclassimportaioevents@dataclassclassMyEvent(aioevents.Event):payload:str@aioevents.manager.register(MyEvent)asyncdefevent_hadler(event:aioevents.Event):print(f"recieved: {event}")asyncdefproduce():asyncwithaioevents.eventsasevents:awaitevents.publish(MyEvent("Hello!"))asyncdefmain():aioevents.start(asyncio.get_event_loop())awaitproduce()print('stopping worker')aioevents.stop()# wait for all coroutinesawaitasyncio.sleep(1)if__name__=="__main__":asyncio.run(main())
许可证
aioevents库是在apache 2许可下提供的。