事件网关蟒蛇
event-gateway-sdk的Python项目详细描述
事件网关python sdk
用于与Event Gateway交互的python库。
内容
背景
这是python sdk,用于与Event Gateway交互,后者是将事件连接到无服务器函数的中心。
安装
pip install event-gateway-sdk
用法
使用emit
命令向事件网关发出CloudEvent负载。此事件将由订阅您的事件的任何函数接收。
fromeventgatewayimportEventGatewayeg=EventGateway(url="https://mytenant-myapp.slsgateway.com")cloudEvent={"eventType":"user.created","cloudEventsVersion":"0.1","source":"/services/users","data":{"userId":"foo","userName":"bar"}}eg.emit(cloudEvent=configData,path="/user/send-mail-user")
emit()
函数有三个参数:
- 是一个有效的cloudevent,
- 一个
path
,它是与函数关联的路径(默认值:/
) - 表示发送到网关的头的
headers
对象(默认值:{"Content-type": "application/json"}
)
函数返回一个请求对象。如果您的事件附加了sync
订阅,则fetch
响应将具有订阅的状态代码和正文。否则,响应将返回一个带有空正文的202 Accepted
状态代码。
构造函数
在上面的示例中,我们使用serverless,inc.提供的hosted Event Gateway中的应用程序url创建了一个事件网关客户机。
您还可以将事件网关sdk与自己的自托管事件网关一起使用。下面列出了构造函数的详细信息。
参数
url
-string
-可选,事件api url,默认值:http://localhost:4000
space
-string
-可选,空间名称,默认值:default
configurationUrl
-string
-可选,配置API URL。默认情况下,它与url
相同,但具有4001
端口connectorUrl
-string
-可选,连接器api url。默认情况下,它与url
相同,但使用4002
端口accessKey
-string
-可选,宿主事件网关的访问密钥。在托管事件网关上使用配置API方法需要访问密钥
示例
fromeventgatewayimportEventGatewayeg=EventGateway(url="https://mytenant-myapp.slsgateway.com",space="user")
可用功能
检查连接
用于检查与事件网关的连接(使用/v1/status
端点)。
示例
fromeventgatewayimportEventGatewayeg=EventGateway()ifeg.checkConnection():print("Connection succesfull")else:print("Issue while connecting")
打印配置
用于打印当前配置的实用程序。
示例
fromeventgatewayimportEventGatewayeg=EventGateway()eg.printConfig()
创建事件类型
函数创建事件类型。
示例
fromeventgatewayimportEventGatewayeventtype={"name":"http.request"}eg=EventGateway()eg.createEventType(eventtype)
获取事件类型
函数获取事件类型。
示例
fromeventgatewayimportEventGatewayeg=EventGateway()eg.getEventType("user.created")
GetAllEventType
函数获取所有事件类型。
示例
fromeventgatewayimportEventGatewayeg=EventGateway()eg.getAllEventType()
createFunction
函数创建函数触发器。
示例
fromeventgatewayimportEventGatewayfunction={"functionId":"new-user","type":"http","provider":{"url":"http://myapp.com/user/new"}}eg=EventGateway()eg.createFunction(function)
GetFunction
函数来获取函数。
示例
fromeventgatewayimportEventGatewayeg=EventGateway()eg.getFunction("new-user")
GetAllFunction
函数来获取所有函数。
示例
fromeventgatewayimportEventGatewayeg=EventGateway()eg.getAllFunction()
创建订阅
函数订阅函数。
示例
fromeventgatewayimportEventGatewaysubscription={"functionId":"new-user","event":"http","method":"POST","path":"/user/new","eventType":"http.request","type":"async"}eg=EventGateway()eg.createSubscription(subscription)
获取订阅
函数获取订阅。
示例
fromeventgatewayimportEventGatewayeg=EventGateway()eg.getSubscription("YXN5bmMsaHR0cC5yZXF1ZXN0LG5ldy11c2VyLW9wZW5wYWFzLCUyRmppcmE")
GetAllSubscription
函数获取所有订阅。
示例
fromeventgatewayimportEventGatewayeg=EventGateway()eg.getAllSubscription()
贡献
待定