通过约定和x-appengine-cron头安全地装饰google云cron端点

Flask-CronDecorator的Python项目详细描述


烧瓶crondecorator

通过约定和X-Appengine-Cron头安全地装饰google云cron端点。

the docs

The X-Appengine-Cron header is set internally by Google App Engine. If your request handler finds this header it can trust that the request is a cron request. The X- headers are stripped by App Engine when they originate from external sources so that you can trust this header.

安装

将此行添加到应用程序的requirements.txt中

Flask-CronDecorator

然后执行:

$ pip install -r requirements.txt

或自行安装为:

$ pip install Flask-CronDecorator

用法

下面的代码片段将帮助您编写代码

fromflaskimportFlask,BlueprintfromCronDecoratorimportCronDecoratorimportloggingfromdatetimeimportdatetime,timedeltafrommodelsimportTask,TaskRequestapp=Flask(__name__)app.cron=CronDecorator(app)# blueprint can optionally be passed in for registering cron task endpoints in a blueprintadmin=Blueprint('admin',__name__,template_folder='templates',url_prefix='/admin')blueprint.cron=CronDecorator(app,blueprint)app.register_blueprint(admin)_logger=logging.getLogger(__name__)@admin.cron.task('/purge_tasks',methods=['GET'])# creates /cron/admin/purge_tasks endpointdefpurge_tasks():_logger.info('Purging first 100 Tasks older than 1 year')year_ago=datetime.utcnow()-timedelta(days=365)tasks=Task.query.filter(Task.created<=year_ago).order_by(Task.id.asc()).limit(100).all()fortaskintasks:db.session.delete(task)_logger.info('Purging first 1000 TaskRequests older than 2 weeks')two_weeks_ago=datetime.utcnow()-timedelta(days=14)task_requests=TaskRequest.query.join(Task).filter(models.Task.created<=two_weeks_ago).order_by(Task.id.desc()).limit(1000).all()fortask_requestintask_requests:db.session.delete(task_request)db.session.commit()return'',200

谷歌云设置

根据上面的代码片段,您需要更新google cloud cron.yaml

cron:-description:"PurgesTasksolderthan1yearandTaskRequestsolderthan2weeks"url:/cron/admin/purge_tasksschedule:every 30 minutes

确保google cloud app.yaml中包含了您的/cron/*端点。注意:handlers:script必须是一个wsgi路径,相对于进程的启动位置,而不一定是app.yaml所在的位置,指向Flask应用程序实例化。

runtime:pythonruntime_config:python_version:2threadsafe:trueenv:flexhandlers:-script:flask.appsecure:alwaysurl:/cron/.*

展开

将app和cron.yaml部署到谷歌云

$ gcloud app deploy
$ gcloud app deploy cron.yaml

测试

$ pytest -s tests.py

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
java Android Action_Edit Intent无法像以前一样调用App Gallery来编辑图片   确保JRE兼容性的java适当程序(32或64位)   java JSONArray。for循环中的add(JSONObject)正在替换for循环中的旧值,数组由循环中的最后一个值组成   java需要帮助创建一个返回数组的方法,该数组的元素是另一个数组的平方   使用SmbFile w/groovy XmlSluper()创建xml。解析()Java   检查大小后的java ArrayIndexOutOfBoundsException   乘法表中的第k个最小元素   java 401 on请求,其中指定了'permitAll()'   java如何附加ORC文件   java hibernate类模型   java IDEA没有看到由自定义注释处理器生成的方法   Servlet中未声明java SerialVersionId   java linkedlist到达列表末尾时   java如何正确对齐EditText光标?   java 6编译器1.6上的eclipse重写方法错误   java如何在基于Jersey的RESTful Web服务中读取post数据   java如何在活动中正确使用接口?   Java的JIT编译器的工作速度有多快?