aws的lambda框架
py-lambda的Python项目详细描述
aws alb lambda python
负载平衡器和lambda函数的框架。
要了解有关在python中编写aws lambda函数的更多信息,请转到the official documentation
关于aws lambda和python的博客是here
入门
这是用python编写的aws lambda框架。当lambda函数由alb或api网关触发时,包含所有必需信息的事件变量将被发送到init函数。默认情况下,此init函数将位于lambda_function.py中,名称将为lambda_handler。此框架包含必须发送回负载平衡器的url和响应的请求资源映射。
包裹信息在Pypi info
pip install py-lambda
此源代码包含以下文件:
- lambda_函数:包含将由alb或api网关等触发的代码。
- request:包含从作为输入发送的事件创建的请求对象。
- response:包含应该从lambda函数发送回alb的对象。
- lambda_mapping:包含路由映射的文件。在这里添加您的路线。
- all_函数:包含函数包含可用于给定路径的相应代码。
- test_lambdahandler:包含可用于排序所有函数的测试代码。
负载均衡器输入
## test_LambdaHandler.pyimportpy_lambda.test_LambdaHandleraslTestprint(lTest.EVENT_FROM_ALB)
上面的代码将给出alb将发送的请求的示例。
映射
## lambda_mapping.pyFUNCT_MAPPING={"":{"":allFun.index},"test":{"":allFun.test,ld.URL_REST_ID_STR:{"":allFun.testUserId,},},"abc":{"":allFun.abc,},}
这将有每个资源的映射。映射是字典,其中键是url资源,值是调用url时必须执行的函数。映射中的索引url是第一个"": {"": allFun.index}
。如果映射的所有函数都在all_Functions.py
中,那么可以在这里调用相应的函数。
如果资源包含一些将更改每个请求的信息,那么映射应该包含URL_REST_ID_STR
,它在lambda_defines.py
表单中,您将能够映射url。
例如:mydomain.com/profile/user_id/image
在这种情况下,每个用户的USER_ID
可能会改变。在这种情况下,在映射中使用URL_REST_ID_STR
输入有效性
## inp_validatn.py## VALID_DOMAINS contains all the valid domains.## VALID_SUB_DOMAINS contails all the valid subdomains.importpy_lambda.funct_definesasfunDefret_validation=validateDomain("http://some_domain.com")##If the domain is valid then the function will return the success responseifret_validation[funDef.FUNCTION_STAT]==funDef.SUCCESS:print("Valid domain")else:print("invalid domain")
在这个函数中可以添加输入验证。validateDomain
将验证给定域,如果该域有效,则返回success。
请求
## request.pyimportpy_lambda.requestasReqimportpy_lambda.test_LambdaHandleraslTestreqObj=Req.Request(lTest.EVENT_FROM_ALB)print("Path Str:",reqObj.getPathStr())print("Req method:",reqObj.httpMeth())print("Req from ALB:",reqObj.isAlb())print("Cookies:",reqObj.getCookies())print("Query params by input list:",reqObj.getQueryParams(['a']))print("Query params all:",reqObj.getAllQueryParam())print("Base64 encooded:",reqObj.isBase64())
当请求发送到lambda函数时,将创建此请求对象对象,我们可以使用此对象从中获取所需的所有信息,如cookie、查询参数、头等。
响应
## response.pyimportpy_lambda.responseasResrespObj=Res.Response()respObj=Res.Response()print("respObj:",respObj())respObj.setError()print("respObj:",respObj())respObj.setResp()print("respObj:",respObj())respObj.setResp(respBody={'a':1,'b':2})print("respObj:",respObj())# respObj.setContentType("application/json")# print("respObj:", respObj())respObj.setHeader("Access-Control-Allow-Origin","*")print("respObj:",respObj())
loab balancer以特定格式重新查询lambda函数的响应。此响应对象是在调用函数时创建的,并且将相应地更改此对象以发送预期的响应。
lambda函数
## lambda_function.pyimportpy_lambda.test_LambdaHandleraslTestimportpy_lambda.lambda_functionaslfRESOURCE_PREFIX=""lTest.EVENT_FROM_ALB["path"]=RESOURCE_PREFIX+'/abc'lTest.EVENT_FROM_ALB["httpMethod"]="GET"resp=lf.lambda_handler(event=lTest.EVENT_FROM_ALB,context="some_context")print(resp)
如果您在lambda_mapping.py
中有一个映射,这将为您提供响应。响应中的正文将包含您在响应中设置的任何内容。甚至可以使用py_lambda.response.setResp
和py_lambda.response.setHeader
测试
## test_LambdaHandler.pyimportpy_lambda.test_LambdaHandleraslTestprint(lTest.testOptions())
它包含了所有用于构建它的测试函数。