GAE: 使用测试环境进行任务队列的单元测试
我正在使用测试平台来对我的谷歌应用引擎应用进行单元测试,而我的应用使用了任务队列。
当我在单元测试中向任务队列提交一个任务时,虽然看起来这个任务已经在队列里了,但实际上这个任务并没有执行。
我该如何让这个任务在单元测试中执行呢?
4 个回答
实现这个功能的另一个(更简洁的)方法是使用测试环境中的任务队列占位符。要做到这一点,你首先需要在你的 setUp()
方法中初始化这个任务队列占位符,具体的代码如下:
self.testbed = init_testbed()
self.testbed.init_taskqueue_stub()
你可以通过以下代码来访问任务调度器:
taskq = self.testbed.get_stub(testbed.TASKQUEUE_SERVICE_NAME)
与队列占位符交互的接口如下:
GetQueues() #returns a list of dictionaries with information about the available queues
#returns a list of dictionaries with information about the tasks in a given queue
GetTasks(queue_name)
DeleteTask(queue_name, task_name) #removes the task task_name from the given queue
FlushQueue(queue_name) #removes all the tasks from the queue
#returns tasks filtered by name & url pointed to by the task from the given queues
get_filtered_tasks(url, name, queue_names)
StartBackgroundExecution() #Executes the queued tasks
Shutdown() #Requests the task scheduler to shutdown.
另外,由于这使用了App Engine SDK自带的功能,所以它与延迟库一起使用时也能正常工作。
根据Saxon的精彩回答,我用testbed代替gaetestbed做了同样的事情。下面是我做的步骤。
我在我的setUp()
方法里加了这个:
self.taskqueue_stub = apiproxy_stub_map.apiproxy.GetStub('taskqueue')
然后,在我的测试中,我用了以下代码:
# Execute the task in the taskqueue
tasks = self.taskqueue_stub.GetTasks("default")
self.assertEqual(len(tasks), 1)
task = tasks[0]
params = base64.b64decode(task["body"])
response = self.app.post(task["url"], params)
在这个过程中,POST参数被进行了base64编码,所以我得把它解码才能让它正常工作。
我觉得这个方法比Saxon的回答更好,因为我可以使用官方的testbed包,并且可以在我自己的测试代码里完成所有操作。
补充一下:后来我想用deferred库提交的任务做同样的事情,花了一些时间才搞明白,所以我在这里分享一下,希望能帮到其他人。
如果你的任务队列里只包含用deferred提交的任务,那么这个代码会运行所有的任务,以及那些任务排队的任何任务:
def submit_deferred(taskq):
tasks = taskq.GetTasks("default")
taskq.FlushQueue("default")
while tasks:
for task in tasks:
(func, args, opts) = pickle.loads(base64.b64decode(task["body"]))
func(*args)
tasks = taskq.GetTasks("default")
taskq.FlushQueue("default")
开发应用服务器是单线程的,这意味着它在运行测试的时候,不能同时在后台执行其他任务。
我修改了gaetestbed中的taskqueue.py文件里的TaskQueueTestCase,添加了一个新的功能:
def execute_tasks(self, application):
"""
Executes all currently queued tasks, and also removes them from the
queue.
The tasks are execute against the provided web application.
"""
# Set up the application for webtest to use (resetting _app in case a
# different one has been used before).
self._app = None
self.APPLICATION = application
# Get all of the tasks, and then clear them.
tasks = self.get_tasks()
self.clear_task_queue()
# Run each of the tasks, checking that they succeeded.
for task in tasks:
response = self.post(task['url'], task['params'])
self.assertOK(response)
为了让这个功能正常工作,我还把TaskQueueTestCase的基类从BaseTestCase改成了WebTestCase。
然后我的测试代码大概是这样的:
# Do something which enqueues a task.
# Check that a task was enqueued, then execute it.
self.assertTrue(len(self.get_tasks()), 1)
self.execute_tasks(some_module.application)
# Now test that the task did what was expected.
这样一来,任务就可以直接从前台的单元测试中执行了。这和生产环境中的情况不完全一样(也就是说,任务会在“稍后某个时间”在一个单独的请求中执行),但对我来说已经足够好了。