异步API
asyncapi的Python项目详细描述
Python异步API
用于将asyncapi规范转换为Python代码的Python库,无需生成代码。 在
AsyncAPI模式:https://asyncapi.io
文档:https://dutradda.github.io/asyncapi-python/
源代码:https://github.com/dutradda/asyncapi-python
主要特点
- 在
读取asyncapi规范并从中创建发布者和订阅服务器
在 - 在
支持数据类的规范声明
在 - 在
为创建订阅服务器提供应用程序
在 - 在
支持kafka、redis和postgres协议(与广播程序库相同)
在 - 在
额外支持google cloud pubsub服务
在 - 在
在http中公开自动生成的规范
在
要求
- 在
Python 3.8+
在 - 在
广播员
在 - 在
松道拉
在 - 在
请求(http规范可选)
在 - 在
typer(订户应用可选)
在 - 在
pyyaml(yaml规范可选)
在 - 在
apidaora(暴露规范可选)
在 - 在
包额外安装:
- http协议
- 山药
- 卡夫卡
- 雷迪斯
- 博士后
- 认购人
- 文件
- 谷歌云pubsub
安装
$ pip install asyncapi[http,yaml,redis,subscriber,docs]
YAML规范示例
^{pr2}$创建订户模块
# user_events.pyfromtypingimportAnyasyncdefreceive_user_update(message:Any)->None:print(f"Received update for user id={message.id}")
启动订阅服务器以侦听事件
PYTHONPATH=. asyncapi-subscriber \ --url api-spec.yaml \ --api-module user_events
Waiting messages...
发布更新
# publish.pyimportasynciofromasyncapiimportbuild_apiapi=build_api('api-spec.yaml')channel_id='user/update'message=api.payload(channel_id,id='fake-user',name='Fake User',age=33)asyncdefpublish()->None:awaitapi.connect()awaitapi.publish(channel_id,message)awaitapi.disconnect()asyncio.run(publish())print(f"Published update for user={message.id}")
python publish.py
Published update for user=fake-user
接收更新
Waiting messages...
Received update for user id=fake-user
暴露
asyncapi-docs --path api-spec.yaml
curl -i localhost:5000/asyncapi.yaml
Python规范示例
# specification.pyimportdataclassesfromtypingimportOptionalimportasyncapi@dataclasses.dataclassclassUserUpdatePayload:id:strname:Optional[str]=Noneage:Optional[int]=Nonedev_server=asyncapi.Server(url='localhost',protocol=asyncapi.ProtocolType.REDIS,description='Development Broker Server',)message=asyncapi.Message(name='userUpdate',title='User Update',summary='Inform about users updates',payload=UserUpdatePayload,)user_update_channel=asyncapi.Channel(description='Topic for user updates',subscribe=asyncapi.Operation(operation_id='receive_user_update',message=message,),publish=asyncapi.Operation(message=message),)spec=asyncapi.Specification(info=asyncapi.Info(title='User API',version='1.0.0',description='API to manage users',),servers={'development':dev_server},channels={'user/update':user_update_channel},components=asyncapi.Components(messages={'UserUpdate':message}),)
创建订户模块
# py_spec_user_events.pyimportspecificationspec=specification.specasyncdefreceive_user_update(message:specification.UserUpdatePayload,)->None:print(f"Received update for user id={message.id}")
启动订阅服务器以侦听事件
PYTHONPATH=. asyncapi-subscriber --api-module user_events
Waiting messages...
发布更新
# publish.pyimportasynciofromasyncapiimportbuild_api_auto_specapi=build_api_auto_spec('specification')channel_id='user/update'message=api.payload(channel_id,id='fake-user',name='Fake User',age=33)asyncdefpublish()->None:awaitapi.connect()awaitapi.publish(channel_id,message)awaitapi.disconnect()asyncio.run(publish())print(f"Published update for user={message.id}")
python publish.py
Published update for user=fake-user
接收更新
Waiting messages...
Received update for user id=fake-user
暴露规范
PYTHONPATH=. asyncapi-docs --api-module specification
curl -i localhost:5000/asyncapi.yaml
- 项目
标签: