任务分发 - 如何批量添加超过5个任务到队列
我正在使用一个任务(叫做queueing-task)来排队多个其他任务,这种方式叫做“扇出”。当我尝试用Queue.add方法,并且传入的任务参数是一个包含超过5个元素的Task实例的列表,并且是在一个事务中时… 我遇到了这个错误。
JointException: taskqueue.DatastoreError caused by:
<class 'google.appengine.api.datastore_errors.BadRequestError'>
Too many messages, maximum allowed 5
有没有其他方法可以在一个事务中排队超过5个任务呢?
或者…
也许我根本不需要事务,因为:
- 我并不在乎这些任务是否会被排队两次,反正我不介意,
- 如果其中任何一个任务排队失败,那么整个queueing-task会重新运行。
所以请告诉我如何在一个事务中排队超过5个任务,或者告诉我不需要使用事务,因为我其实并不需要。
1 个回答
2
解决你问题的一个方法是添加一个事务性任务,这个任务可以把剩下的任务分发出去。你只需要在现有的事务中添加这个分发任务就可以了。
除非有业务逻辑的原因,否则不要重新运行已经执行过的任务。防止任务被重复插入(也就是重复执行)是很简单的,这样可以节省资源。你的分发任务大致会是这样的:
class FanOutTask(webapp.RequestHandler):
def get(self):
name = self.request.get('name')
params = deserialize(self.request.get('params'))
try:
task_params = params.get('stuff')
taskqueue.add(url='/worker/1', name=name + '-1', params=task_params)
except TaskAlreadyExistsError:
pass
try:
task_params = params.get('more')
taskqueue.add(url='/worker/2', name=name + '-2', params=task_params)
except TaskAlreadyExistsError:
pass
通过事务性地添加分发任务,可以确保它被放入队列中。如果这个任务已经执行过了,产生的错误会被捕捉并忽略,而其他错误则会导致分发任务重新执行。使用这种方式,你可以很轻松地插入很多子任务。