阿里云函数计算SDK2
aliyun-fc2的Python项目详细描述
概述
此版本的sdk依赖于第三方http库requests。
运行环境
python 2.7和python 3.6
注意
FC和FC2不兼容,现在主回购是FC2,如果您仍然使用FC,则需要1.x分支。 我们建议使用fc2,fc和fc2的主要区别是:
1、所有的http请求功能都可以设置头
definvoke_function(self,serviceName,functionName,payload=None,headers={'x-fc-invocation-type':'Sync','x-fc-log-type':'None'}):...
注意:放弃异步调用函数,只有一个函数接口调用函数,用x-fc-invocation-type参数(sync或async)区分同步和异步。
# sync invokeclient.invoke_function('service_name','function_name')# async invokeclient.invoke_function('service_name','function_name',headers={'x-fc-invocation-type':'Async'})
2,用户返回的所有http响应是以下对象
classFcHttpResponse(object):def__init__(self,headers,data):self._headers=headersself._data=data@propertydefheaders(self):returnself._headers@propertydefdata(self):returnself._data
注意:对于invoke函数,数据是字节,对于其他api,数据是dict
安装
通过pip安装正式版本(以linux为例):
$ pip install aliyun-fc2
您也可以直接安装解压缩的安装程序包:
$ sudo python setup.py install
如果您仍然使用fc,可以通过pip(以linux为例)安装正式的fc1发行版:
$ pip install aliyun-fc
开始
# -*- coding: utf-8 -*-importfc2# To know the endpoint and access key id/secret info, please refer to:# https://help.aliyun.com/document_detail/52984.htmlclient=fc2.Client(endpoint='<Your Endpoint>',accessKeyID='<Your AccessKeyID>',accessKeySecret='<Your AccessKeySecret>')# Create service.client.create_service('service_name')# set vpc config when creating the servicevpcConfig={'vpcId':'<Your Vpc Id>','vSwitchIds':'<[Your VSwitch Ids]>','securityGroupId':'<Your Security Group Id>'}# create vpcConfig when creating the service# you have to set the role if you want to set vpc configvpc_role='acs:ram::12345678:role/aliyunvpcrole'# set nas config when creating the servicenasConfig={"userId":'<The NAS file system user id>',"groupId":'<The NAS file system group id>',"mountPoints":[{"serverAddrserverAddr":'<The NAS file system mount target>',"mountDir":'<The mount dir to the local file system>',}],}service=client.create_service(name,role=vpc_role,vpcConfig=vpcConfig,nasConfig=nasConfig)# Create function.# the current directory has a main.zip file (main.py which has a function of myhandler)# set environment variables {'testKey': 'testValue'}client.create_function('service_name','function_name','python3','main.my_handler',codeZipFile='main.zip',environmentVariables={'testKey':'testValue'})# Create function with initailizer# main.my_initializer is the entry point of initializer interfaceclient.create_function('service_name','function_name','python3','main.my_handler',"main.my_initializer",codeZipFile='main.zip',environmentVariables={'testKey':'testValue'})# Invoke function synchronously.client.invoke_function('service_name','function_name')# Create trigger# Create oss triggeross_trigger_config={'events':['oss:ObjectCreated:*'],'filter':{'key':{'prefix':'prefix','suffix':'suffix'}}}source_arn='acs:oss:cn-shanghai:12345678:bucketName'invocation_role='acs:ram::12345678:role/aliyunosseventnotificationrole'client.create_trigger('service_name','function_name','trigger_name','oss',oss_trigger_config,source_arn,invocation_role)# Create log triggerlog_trigger_config={'sourceConfig':{'logstore':'log_store_source'},'jobConfig':{'triggerInterval':60,'maxRetryTime':10},'functionParameter':{},'logConfig':{'project':'log_project','logstore':'log_store'},'enable':False}source_arn='acs:log:cn-shanghai:12345678:project/log_project'invocation_role='acs:ram::12345678:role/aliyunlogetlrole'client.create_trigger('service_name','function_name','trigger_name','oss',log_trigger_config,source_arn,invocation_role)# Create time triggertime_trigger_config={'payload':'awesome-fc''cronExpression':'0 5 * * * *''enable':true}client.create_trigger('service_name','function_name','trigger_name','timer',time_trigger_config,'','')# Invoke a function with a input parameter.client.invoke_function('service_name','function_name',payload=bytes('hello_world'))# Read a image and invoke a function with the file data as input parameter.src=open('src_image_file_path','rb')# Note: please open it as binary.r=client.invoke_function('service_name','function_name',payload=src)# save the result as the output image.dst=open('dst_image_file_path','wb')dst.write(r.data)src.close()dst.close()# Invoke function asynchronously.client.invoke_function('service_name','function_name',headers={'x-fc-invocation-type':'Async'})# List services.client.list_services()# List functions with prefix and limit.client.list_functions('service_name',prefix='the_prefix',limit=10)# Delete service.client.delete_service('service_name')# Delete function.client.delete_function('service_name','function_name')
测试
要运行测试,请将访问密钥id/secret、端点设置为环境变量。 以Linux系统为例:
$ exportENDPOINT=<endpoint> $ exportACCESS_KEY_ID=<AccessKeyId> $ exportACCESS_KEY_SECRET=<AccessKeySecret> $ exportSTS_TOKEN=<roleARN>
按以下方法运行测试:
$ nosetests # First install nose