通过约定和x-appengine-cron头安全地装饰google云cron端点

Flask-CronDecorator的Python项目详细描述


烧瓶crondecorator

通过约定和X-Appengine-Cron头安全地装饰google云cron端点。

the docs

The X-Appengine-Cron header is set internally by Google App Engine. If your request handler finds this header it can trust that the request is a cron request. The X- headers are stripped by App Engine when they originate from external sources so that you can trust this header.

安装

将此行添加到应用程序的requirements.txt中

Flask-CronDecorator

然后执行:

$ pip install -r requirements.txt

或自行安装为:

$ pip install Flask-CronDecorator

用法

下面的代码片段将帮助您编写代码

fromflaskimportFlask,BlueprintfromCronDecoratorimportCronDecoratorimportloggingfromdatetimeimportdatetime,timedeltafrommodelsimportTask,TaskRequestapp=Flask(__name__)app.cron=CronDecorator(app)# blueprint can optionally be passed in for registering cron task endpoints in a blueprintadmin=Blueprint('admin',__name__,template_folder='templates',url_prefix='/admin')blueprint.cron=CronDecorator(app,blueprint)app.register_blueprint(admin)_logger=logging.getLogger(__name__)@admin.cron.task('/purge_tasks',methods=['GET'])# creates /cron/admin/purge_tasks endpointdefpurge_tasks():_logger.info('Purging first 100 Tasks older than 1 year')year_ago=datetime.utcnow()-timedelta(days=365)tasks=Task.query.filter(Task.created<=year_ago).order_by(Task.id.asc()).limit(100).all()fortaskintasks:db.session.delete(task)_logger.info('Purging first 1000 TaskRequests older than 2 weeks')two_weeks_ago=datetime.utcnow()-timedelta(days=14)task_requests=TaskRequest.query.join(Task).filter(models.Task.created<=two_weeks_ago).order_by(Task.id.desc()).limit(1000).all()fortask_requestintask_requests:db.session.delete(task_request)db.session.commit()return'',200

谷歌云设置

根据上面的代码片段,您需要更新google cloud cron.yaml

cron:-description:"PurgesTasksolderthan1yearandTaskRequestsolderthan2weeks"url:/cron/admin/purge_tasksschedule:every 30 minutes

确保google cloud app.yaml中包含了您的/cron/*端点。注意:handlers:script必须是一个wsgi路径,相对于进程的启动位置,而不一定是app.yaml所在的位置,指向Flask应用程序实例化。

runtime:pythonruntime_config:python_version:2threadsafe:trueenv:flexhandlers:-script:flask.appsecure:alwaysurl:/cron/.*

展开

将app和cron.yaml部署到谷歌云

$ gcloud app deploy
$ gcloud app deploy cron.yaml

测试

$ pytest -s tests.py

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
java数据未插入SQLite数据库   Java中内存有效的对象创建   java在方法内部使用“this”(不用于调用方法、构造函数或变量)   java为什么这里会出现NullPointerException?   在REST中使用HATEOAS导致的java循环依赖   java如何定制spring boot横幅?   Java数字基数计算器(即基数10到基数5)   如果在Kotlin vs Java中声明,用作全局上下文的安卓 MainApplication类将崩溃   用于过滤对象的Java lambda函数   java从字符串数组中获取整数列表   java为什么Maven找不到org。json JPMS自动模块?   java将字符串数组转换为int   仅当与阈值字节匹配时,java才会在映射中填充字符串值