在GAE中使用Python的延迟任务队列

0 投票
2 回答
1151 浏览
提问于 2025-04-18 07:18

抱歉如果这个问题之前有人问过。我搜索了很多地方,找到了一些零散的信息,但没有找到完全能帮助我的答案。

我正在用Python在Google App Engine上开发一个应用,用户可以上传文件,然后这些文件会被一段Python代码处理,处理完的文件会通过邮件发送回用户。

最开始我使用了一个延迟任务来完成这个,效果很好。但随着时间的推移,我意识到处理可能会超过10分钟,这样就会出现DeadlineExceededError的错误,所以我需要想个更聪明的办法。

因此,我开始研究任务队列,想要创建一个可以分块处理文件的队列,最后再把所有部分拼接在一起。

我现在的代码是用来创建单个延迟任务的,代码如下:

_=deferred.defer(transform_function,filename,from,to,email)

这样transform_function函数就能获取文件名、来源、去向和邮箱等值,然后开始处理。

有人能告诉我怎么把这个变成一个线性任务链,让每个任务一个接一个地执行吗?我已经阅读了所有我能想到的Google App Engine的文档,但可惜的是,里面的代码示例不够详细。

我看到有提到一些东西,比如:

taskqueue.add(url='/worker', params={'key': key})

但因为我没有任务的URL,而是有一个在别处实现的transform_function(),所以我不太明白这对我有什么用……

非常感谢!

2 个回答

0

为了避免10分钟的超时,你可以通过使用“_target”参数将请求发送到后台或B类模块。

顺便问一下,你为什么需要按顺序处理这些数据块呢?如果你只是想在所有数据块处理完成后收到通知(这样你就可以在最后把所有东西拼在一起),其实有很多方法可以实现这个目标。例如,你可以为每个数据块设置一个延迟任务,这个任务会减少一个共享的数据计数器(先读取状态,然后减少并在同一个事务中更新),这个计数器一开始就设置为数据块的数量。如果数据存储的更新成功,并且计数器降到零了,你就可以开始把所有的部分组合在一起。还有一种替代方案是使用管道,这样可以简化你提到的工作流程,具体可以参考这个链接:https://code.google.com/p/appengine-pipeline/wiki/GettingStarted

0

你可以一直调用deferred来在每个阶段结束时运行你的任务。其他队列只是让你控制任务的调度和速度,但工作原理是一样的。

我在任务中记录经过的时间,当我接近处理时间的结束时,代码会停止当前的工作,然后调用defer来处理链中的下一个任务,或者继续之前的工作,这取决于是离散的步骤还是连续的工作。这些代码是在任务只能运行60秒的时候写的。

不过你会遇到一个问题(无论是普通任务队列还是deferred),每个阶段可能会因为某种原因失败,然后需要重新运行,所以每个阶段必须是幂等的,也就是说无论执行多少次,结果都是一样的。

对于长时间运行的链式任务,我会在数据存储中创建一个实体,记录要完成的工作的描述,并跟踪这个工作的处理状态,这样你就可以一直重新运行同一个任务直到完成。完成后,它会将这个工作标记为完成。

撰写回答