在复杂环境中测试异步tornado RequestHandler方法

0 投票
1 回答
1043 浏览
提问于 2025-04-18 14:20

我正在为一个继承自tornado.web.RequestHandler的类编写单元测试,这个类会对数据库进行聚合查询。我已经浪费了好几天时间想让这些测试正常工作。
测试使用的是pytest和factoryboy。很多重要的tornado类都有用于测试的工厂。
这是我正在测试的类:

SUPPORTED_METHODS = (
    "GET", "POST", "OPTIONS")


def get(self):
    self.aggregate()


@auth.hmac_auth
#@tornado.web.asynchronous
@tornado.web.removeslash
@tornado.gen.coroutine
def aggregate(self):
    '''
    '''
    self.logger.info('api aggregate')
    data = self.data
    print("Data: {0}".format(data))


    pipeline = data['pipeline']


    self.logger.debug('pipeline : {0}'.format(pipeline))
    self.logger.debug('utc tz : {0}'.format(tz_util.utc))


    # execute pipeline query
    print(self.collection)
    try:
        cursor_future = self.collection.aggregate(pipeline, cursor={})
        print(cursor_future)
        cursor = yield cursor_future
        print("Cursor: {0}".format(cursor))
    except Exception as e:
        print(e)
    documents = yield cursor.to_list(length=None)


    self.logger.debug('results : {0}'.format(documents))


    # process MongoDB JSON extended
    results = json.loads(json_util.dumps(documents))
    pipeline = json.loads(json_util.dumps(pipeline))


    response_data = {
        'pipeline': pipeline,
        'results': results
    }


    self.respond(response_data)

用于测试的方法在这里:

#@tornado.testing.gen_test
def test_time_inside(self):
    current_time = gen_time()
    past_time =  gen_time() - datetime.timedelta(minutes=20)


    test_query = copy.deepcopy(QUERY)
    oid = ObjectId("53a72de12fb05c0788545ed6")
    test_query[0]['$match']['attribute'] = oid
    test_query[0]['$match']['date_created']['$gte'] = past_time
    test_query[0]['$match']['date_created']['$lte'] = current_time


    request = produce.HTTPRequest(
        method="GET",
        headers=produce.HTTPHeaders(
            kwargs = {
                "Content-Type": "application/json",
                "Accept": "application/json",
                "X-Sl-Organization": "test",
                "Hmac": "83275edec557e2a339e0ec624201db604645e1e1",
                "X-Sl-Username": "test@test.co",
                "X-Sl-Expires": 1602011725
            }
        ),
        uri="/api/v1/attribute-data/aggregate?{0}".format(json_util.dumps({
            "pipeline": test_query
        }))
    )


    self.ARH = produce.AggregateRequestHandler(request=request)


    #io_loop = tornado.ioloop.IOLoop.instance()
    self.io_loop.run_sync(self.ARH.get)


    #def stop_test():
        #self.stop()


    #self.ARH.test_get(stop_test)
    #self.wait()


    output = self.ARH.get_written_output()


    assert output == ""

这是我为请求处理器设置工厂的方式:

class OutputTestAggregateRequestHandler(slapi.rest.AggregateRequestHandler, tornado.testing.AsyncTestCase):
    '''
    '''


    _written_output = []




    def write(self, chunk):
        print("Previously written: {0}".format(self._written_output))
        print("Len: {0}".format(len(self._written_output)))
        if self._finished:
            raise RuntimeError("Cannot write() after finish().  May be caused "
                               "by using async operations without the "
                               "@asynchronous decorator.")
        if isinstance(chunk, dict):
            print("Going to encode a chunk")
            chunk = escape.json_encode(chunk)
            self.set_header("Content-Type", "application/json; charset=UTF-8")
        chunk = escape.utf8(chunk)
        print("Writing")
        self._written_output = []
        self._written_output.append(chunk)
        print(chunk)




    def flush(self, include_footers=False, callback=None):
        pass




    def get_written_output(self):
        for_return = self._written_output
        self._written_output = []
        return for_return




class AggregateRequestHandler(StreamlyneRequestHandler):
    '''
    '''


    class Meta:
        model = OutputTestAggregateRequestHandler


    model = slapi.model.AttributeDatum

在运行测试时,测试在def aggregate(self):方法中停住了,具体是在print(cursor_future)print("Cursor: {0}".format(cursor))之间的某个地方。

在标准输出中你会看到

MotorCollection(Collection(Database(MongoClient([]), u'test'), u'attribute_datum'))
<tornado.concurrent.Future object at 0x7fbc737993d0>

而且测试没有其他输出,失败的地方是

>       assert output == ""
E       AssertionError: assert [] == ''

经过大量查看文档、示例和Stack Overflow的内容,我终于通过在OutputTestAggregateRequestHandler中添加以下代码,得到了一个可以运行的测试:

def set_io_loop(self):
    self.io_loop = tornado.ioloop.IOLoop.instance()


def ioloop(f):
    @functools.wraps(f)
    def wrapper(self, *args, **kwargs):
        print(args)
        self.set_io_loop()
        return f(self, *args, **kwargs)
    return wrapper


def runTest(self):
    pass

然后把AggregateRequestHandler.aggregate中的所有代码复制到OutputTestAggregateRequestHandler中,但使用不同的装饰器:

@ioloop
@tornado.testing.gen_test
def _aggregate(self):
    ......

然后我得到了以下输出:

assert output == ""
E       AssertionError: assert ['{\n    "pipeline": [\n        {\n            "$match": {\n                "attribute": {\n                    "$oid"...                "$oid": "53cec0e72dc9832c4c4185f2"\n            }, \n            "quality": 9001\n        }\n    ]\n}'] == ''

这实际上是成功的,但我只是故意触发了一个断言错误来查看输出。

我面临的最大问题是,如何实现我想要的结果,也就是通过添加额外代码和复制聚合方法得到的输出。
显然,当我把代码从聚合方法中复制出来后,测试在我对实际方法进行更改后就不再有用了。我该如何让实际的聚合方法在测试中正常工作,而不是在遇到异步代码时就停下来呢?
谢谢任何帮助,
祝好!
-Liam

1 个回答

0

一般来说,测试请求处理器(RequestHandlers)最好的方法是使用AsyncHTTPTestCase,而不是AsyncTestCase。这样做会为你设置好HTTP客户端和服务器,所有的请求都会通过HTTP的流程来处理。虽然在应用程序和HTTP服务器之外使用请求处理器并不是完全支持的,但在Tornado 4.0中,可能可以使用一个虚拟的HTTP连接来避免使用完整的服务器架构。这种方法可能会更快,但目前来说,这还是一个比较新的领域。

撰写回答