我使用多线程设计(别无选择),但我的大多数代码都位于一个线程中,其中的所有事件都通过queue进行管理。以这种方式,我的大多数代码表现得好像是单线程的,我不必担心锁、信号量和其他问题。
唉,我已经到了需要联合测试我的代码的地步(请不要因为一开始没有TDDing而大发雷霆),我不知所措——你如何在另一个线程中测试某些东西?
例如,假设我有以下课程:
class MyClass():
def __init__(self):
self.a=0
# register event to self.on_event
def on_some_event(self, b):
self.a += b
def get(self):
return self.a
我想测试一下:
import unittest
from queued_thread import ThreadedQueueHandler
class TestMyClass(unittest.TestCase):
def setUp(self):
# create the queued thread and assign the queue to self.queue
def test_MyClass(self):
mc = MyClass()
self.queue.put({'event_name':'some_event', 'val':1})
self.queue.put({'event_name':'some_event', 'val':2})
self.queue.put({'event_name':'some_event', 'val':3})
self.assertEqual(mc.get(),6)
if __name__ == '__main__':
unittest.main()
MyClass.get()
对于队列线程中的任何内容都可以正常工作,但是测试将在主线程中异步调用它,因此结果可能不正确!
如果你的设计假设所有东西都必须通过队列,那么不要反对它-让所有东西都通过它!
将
on_call
事件添加到排队事件处理程序中,并向其注册以下函数:然后将测试修改为:
您可以查看stdlib测试中的test_threading.py,该测试执行的操作与您尝试执行的操作类似。基本思想是使用互斥和信号量保护线程执行,以便在断言测试条件之前完成执行。
相关问题 更多 >
编程相关推荐