在复杂环境中测试异步tornado RequestHandler方法
我正在为一个继承自tornado.web.RequestHandler的类编写单元测试,这个类会对数据库进行聚合查询。我已经浪费了好几天时间想让这些测试正常工作。
测试使用的是pytest和factoryboy。很多重要的tornado类都有用于测试的工厂。
这是我正在测试的类:
SUPPORTED_METHODS = (
"GET", "POST", "OPTIONS")
def get(self):
self.aggregate()
@auth.hmac_auth
#@tornado.web.asynchronous
@tornado.web.removeslash
@tornado.gen.coroutine
def aggregate(self):
'''
'''
self.logger.info('api aggregate')
data = self.data
print("Data: {0}".format(data))
pipeline = data['pipeline']
self.logger.debug('pipeline : {0}'.format(pipeline))
self.logger.debug('utc tz : {0}'.format(tz_util.utc))
# execute pipeline query
print(self.collection)
try:
cursor_future = self.collection.aggregate(pipeline, cursor={})
print(cursor_future)
cursor = yield cursor_future
print("Cursor: {0}".format(cursor))
except Exception as e:
print(e)
documents = yield cursor.to_list(length=None)
self.logger.debug('results : {0}'.format(documents))
# process MongoDB JSON extended
results = json.loads(json_util.dumps(documents))
pipeline = json.loads(json_util.dumps(pipeline))
response_data = {
'pipeline': pipeline,
'results': results
}
self.respond(response_data)
用于测试的方法在这里:
#@tornado.testing.gen_test
def test_time_inside(self):
current_time = gen_time()
past_time = gen_time() - datetime.timedelta(minutes=20)
test_query = copy.deepcopy(QUERY)
oid = ObjectId("53a72de12fb05c0788545ed6")
test_query[0]['$match']['attribute'] = oid
test_query[0]['$match']['date_created']['$gte'] = past_time
test_query[0]['$match']['date_created']['$lte'] = current_time
request = produce.HTTPRequest(
method="GET",
headers=produce.HTTPHeaders(
kwargs = {
"Content-Type": "application/json",
"Accept": "application/json",
"X-Sl-Organization": "test",
"Hmac": "83275edec557e2a339e0ec624201db604645e1e1",
"X-Sl-Username": "test@test.co",
"X-Sl-Expires": 1602011725
}
),
uri="/api/v1/attribute-data/aggregate?{0}".format(json_util.dumps({
"pipeline": test_query
}))
)
self.ARH = produce.AggregateRequestHandler(request=request)
#io_loop = tornado.ioloop.IOLoop.instance()
self.io_loop.run_sync(self.ARH.get)
#def stop_test():
#self.stop()
#self.ARH.test_get(stop_test)
#self.wait()
output = self.ARH.get_written_output()
assert output == ""
这是我为请求处理器设置工厂的方式:
class OutputTestAggregateRequestHandler(slapi.rest.AggregateRequestHandler, tornado.testing.AsyncTestCase):
'''
'''
_written_output = []
def write(self, chunk):
print("Previously written: {0}".format(self._written_output))
print("Len: {0}".format(len(self._written_output)))
if self._finished:
raise RuntimeError("Cannot write() after finish(). May be caused "
"by using async operations without the "
"@asynchronous decorator.")
if isinstance(chunk, dict):
print("Going to encode a chunk")
chunk = escape.json_encode(chunk)
self.set_header("Content-Type", "application/json; charset=UTF-8")
chunk = escape.utf8(chunk)
print("Writing")
self._written_output = []
self._written_output.append(chunk)
print(chunk)
def flush(self, include_footers=False, callback=None):
pass
def get_written_output(self):
for_return = self._written_output
self._written_output = []
return for_return
class AggregateRequestHandler(StreamlyneRequestHandler):
'''
'''
class Meta:
model = OutputTestAggregateRequestHandler
model = slapi.model.AttributeDatum
在运行测试时,测试在def aggregate(self):
方法中停住了,具体是在print(cursor_future)
和print("Cursor: {0}".format(cursor))
之间的某个地方。
在标准输出中你会看到
MotorCollection(Collection(Database(MongoClient([]), u'test'), u'attribute_datum'))
<tornado.concurrent.Future object at 0x7fbc737993d0>
而且测试没有其他输出,失败的地方是
> assert output == ""
E AssertionError: assert [] == ''
经过大量查看文档、示例和Stack Overflow的内容,我终于通过在OutputTestAggregateRequestHandler中添加以下代码,得到了一个可以运行的测试:
def set_io_loop(self):
self.io_loop = tornado.ioloop.IOLoop.instance()
def ioloop(f):
@functools.wraps(f)
def wrapper(self, *args, **kwargs):
print(args)
self.set_io_loop()
return f(self, *args, **kwargs)
return wrapper
def runTest(self):
pass
然后把AggregateRequestHandler.aggregate中的所有代码复制到OutputTestAggregateRequestHandler中,但使用不同的装饰器:
@ioloop
@tornado.testing.gen_test
def _aggregate(self):
......
然后我得到了以下输出:
assert output == ""
E AssertionError: assert ['{\n "pipeline": [\n {\n "$match": {\n "attribute": {\n "$oid"... "$oid": "53cec0e72dc9832c4c4185f2"\n }, \n "quality": 9001\n }\n ]\n}'] == ''
这实际上是成功的,但我只是故意触发了一个断言错误来查看输出。
我面临的最大问题是,如何实现我想要的结果,也就是通过添加额外代码和复制聚合方法得到的输出。
显然,当我把代码从聚合方法中复制出来后,测试在我对实际方法进行更改后就不再有用了。我该如何让实际的聚合方法在测试中正常工作,而不是在遇到异步代码时就停下来呢?
谢谢任何帮助,
祝好!
-Liam
1 个回答
一般来说,测试请求处理器(RequestHandlers)最好的方法是使用AsyncHTTPTestCase,而不是AsyncTestCase。这样做会为你设置好HTTP客户端和服务器,所有的请求都会通过HTTP的流程来处理。虽然在应用程序和HTTP服务器之外使用请求处理器并不是完全支持的,但在Tornado 4.0中,可能可以使用一个虚拟的HTTP连接来避免使用完整的服务器架构。这种方法可能会更快,但目前来说,这还是一个比较新的领域。