GAE: 使用测试环境进行任务队列的单元测试

18 投票
4 回答
4790 浏览
提问于 2025-04-16 21:09

我正在使用测试平台来对我的谷歌应用引擎应用进行单元测试,而我的应用使用了任务队列。

当我在单元测试中向任务队列提交一个任务时,虽然看起来这个任务已经在队列里了,但实际上这个任务并没有执行。

我该如何让这个任务在单元测试中执行呢?

4 个回答

14

实现这个功能的另一个(更简洁的)方法是使用测试环境中的任务队列占位符。要做到这一点,你首先需要在你的 setUp() 方法中初始化这个任务队列占位符,具体的代码如下:

self.testbed = init_testbed()
self.testbed.init_taskqueue_stub()

你可以通过以下代码来访问任务调度器:

taskq = self.testbed.get_stub(testbed.TASKQUEUE_SERVICE_NAME)

与队列占位符交互的接口如下:

GetQueues() #returns a list of dictionaries with information about the available queues

#returns a list of dictionaries with information about the tasks in a given queue
GetTasks(queue_name)

DeleteTask(queue_name, task_name) #removes the task task_name from the given queue

FlushQueue(queue_name) #removes all the tasks from the queue

#returns tasks filtered by name & url pointed to by the task from the given queues
get_filtered_tasks(url, name, queue_names)

StartBackgroundExecution() #Executes the queued tasks

Shutdown() #Requests the task scheduler to shutdown.

另外,由于这使用了App Engine SDK自带的功能,所以它与延迟库一起使用时也能正常工作。

24

根据Saxon的精彩回答,我用testbed代替gaetestbed做了同样的事情。下面是我做的步骤。

我在我的setUp()方法里加了这个:

    self.taskqueue_stub = apiproxy_stub_map.apiproxy.GetStub('taskqueue')

然后,在我的测试中,我用了以下代码:

    # Execute the task in the taskqueue
    tasks = self.taskqueue_stub.GetTasks("default")
    self.assertEqual(len(tasks), 1)
    task = tasks[0]
    params = base64.b64decode(task["body"])
    response = self.app.post(task["url"], params)

在这个过程中,POST参数被进行了base64编码,所以我得把它解码才能让它正常工作。

我觉得这个方法比Saxon的回答更好,因为我可以使用官方的testbed包,并且可以在我自己的测试代码里完成所有操作。

补充一下:后来我想用deferred库提交的任务做同样的事情,花了一些时间才搞明白,所以我在这里分享一下,希望能帮到其他人。

如果你的任务队列里只包含用deferred提交的任务,那么这个代码会运行所有的任务,以及那些任务排队的任何任务:

def submit_deferred(taskq):
    tasks = taskq.GetTasks("default")
    taskq.FlushQueue("default")
    while tasks:
        for task in tasks:
            (func, args, opts) = pickle.loads(base64.b64decode(task["body"]))
            func(*args)
        tasks = taskq.GetTasks("default")
        taskq.FlushQueue("default")
8

开发应用服务器是单线程的,这意味着它在运行测试的时候,不能同时在后台执行其他任务。

我修改了gaetestbed中的taskqueue.py文件里的TaskQueueTestCase,添加了一个新的功能:

def execute_tasks(self, application):
    """
    Executes all currently queued tasks, and also removes them from the 
    queue.
    The tasks are execute against the provided web application.
    """

    # Set up the application for webtest to use (resetting _app in case a
    # different one has been used before). 
    self._app = None
    self.APPLICATION = application

    # Get all of the tasks, and then clear them.
    tasks = self.get_tasks()
    self.clear_task_queue()

    # Run each of the tasks, checking that they succeeded.
    for task in tasks:
        response = self.post(task['url'], task['params'])
        self.assertOK(response)

为了让这个功能正常工作,我还把TaskQueueTestCase的基类从BaseTestCase改成了WebTestCase。

然后我的测试代码大概是这样的:

# Do something which enqueues a task.

# Check that a task was enqueued, then execute it.
self.assertTrue(len(self.get_tasks()), 1)
self.execute_tasks(some_module.application)

# Now test that the task did what was expected.

这样一来,任务就可以直接从前台的单元测试中执行了。这和生产环境中的情况不完全一样(也就是说,任务会在“稍后某个时间”在一个单独的请求中执行),但对我来说已经足够好了。

撰写回答