通过约定和x-appengine-cron头安全地装饰google云cron端点
Flask-CronDecorator的Python项目详细描述
烧瓶crondecorator
通过约定和X-Appengine-Cron
头安全地装饰google云cron端点。
每the docs:
The X-Appengine-Cron header is set internally by Google App Engine. If your request handler finds this header it can trust that the request is a cron request. The X- headers are stripped by App Engine when they originate from external sources so that you can trust this header.
安装
将此行添加到应用程序的requirements.txt中
Flask-CronDecorator
然后执行:
$ pip install -r requirements.txt
或自行安装为:
$ pip install Flask-CronDecorator
用法
下面的代码片段将帮助您编写代码
fromflaskimportFlask,BlueprintfromCronDecoratorimportCronDecoratorimportloggingfromdatetimeimportdatetime,timedeltafrommodelsimportTask,TaskRequestapp=Flask(__name__)app.cron=CronDecorator(app)# blueprint can optionally be passed in for registering cron task endpoints in a blueprintadmin=Blueprint('admin',__name__,template_folder='templates',url_prefix='/admin')blueprint.cron=CronDecorator(app,blueprint)app.register_blueprint(admin)_logger=logging.getLogger(__name__)@admin.cron.task('/purge_tasks',methods=['GET'])# creates /cron/admin/purge_tasks endpointdefpurge_tasks():_logger.info('Purging first 100 Tasks older than 1 year')year_ago=datetime.utcnow()-timedelta(days=365)tasks=Task.query.filter(Task.created<=year_ago).order_by(Task.id.asc()).limit(100).all()fortaskintasks:db.session.delete(task)_logger.info('Purging first 1000 TaskRequests older than 2 weeks')two_weeks_ago=datetime.utcnow()-timedelta(days=14)task_requests=TaskRequest.query.join(Task).filter(models.Task.created<=two_weeks_ago).order_by(Task.id.desc()).limit(1000).all()fortask_requestintask_requests:db.session.delete(task_request)db.session.commit()return'',200
谷歌云设置
根据上面的代码片段,您需要更新google cloud cron.yaml
cron:-description:"PurgesTasksolderthan1yearandTaskRequestsolderthan2weeks"url:/cron/admin/purge_tasksschedule:every 30 minutes
确保google cloud app.yaml中包含了您的/cron/*端点。注意:handlers:script必须是一个wsgi路径,相对于进程的启动位置,而不一定是app.yaml所在的位置,指向Flask应用程序实例化。
runtime:pythonruntime_config:python_version:2threadsafe:trueenv:flexhandlers:-script:flask.appsecure:alwaysurl:/cron/.*
展开
将app和cron.yaml部署到谷歌云
$ gcloud app deploy
$ gcloud app deploy cron.yaml
测试
$ pytest -s tests.py