<p>传统上,在多线程代码中强制争用条件是通过信号量完成的,因此可以强制一个线程等到另一个线程达到某个边缘条件后再继续。</p>
<p>例如,如果对象已经在运行,则对象有一些代码来检查是否未调用<code>start</code>。您可以通过执行以下操作来强制此条件以确保其行为符合预期:</p>
<ul>
<li>启动<code>KitchenTimer</code></li>
<li>在运行状态下在信号量上设置计时器块</li>
<li>在另一个线程中启动同一个计时器</li>
<li>捕捉<code>AlreadyRunningError</code></li>
</ul>
<p>要完成这些任务,您可能需要扩展KitchenTimer类。正式的单元测试通常使用在关键时刻被定义为阻塞的模拟对象。Mock对象是一个比我在这里要讨论的更大的主题,但是在google上搜索“python Mock对象”会找到很多文档和许多可供选择的实现。</p>
<p>有一种方法可以强制代码抛出<code>AlreadyRunningError</code>:</p>
<pre><code>import threading
class TestKitchenTimer(KitchenTimer):
_runningLock = threading.Condition()
def start(self, duration=1, whenTimeUp=None):
KitchenTimer.start(self, duration, whenTimeUp)
with self._runningLock:
print "waiting on _runningLock"
self._runningLock.wait()
def resume(self):
with self._runningLock:
self._runningLock.notify()
timer = TestKitchenTimer()
# Start the timer in a subthread. This thread will block as soon as
# it is started.
thread_1 = threading.Thread(target = timer.start, args = (10, None))
thread_1.start()
# Attempt to start the timer in a second thread, causing it to throw
# an AlreadyRunningError.
try:
thread_2 = threading.Thread(target = timer.start, args = (10, None))
thread_2.start()
except AlreadyRunningError:
print "AlreadyRunningError"
timer.resume()
timer.stop()
</code></pre>
<p>阅读代码,确定一些要测试的边界条件,然后考虑需要暂停计时器以强制出现该条件的位置,并添加条件、信号量、事件等以使其发生。e、 当计时器运行whenTimeUp回调时,如果另一个线程试图停止它,会发生什么情况?您可以通过使计时器在进入时立即等待来强制该条件:</p>
<pre><code>import threading
class TestKitchenTimer(KitchenTimer):
_runningLock = threading.Condition()
def _whenTimeup(self):
with self._runningLock:
self._runningLock.wait()
KitchenTimer._whenTimeup(self)
def resume(self):
with self._runningLock:
self._runningLock.notify()
def TimeupCallback():
print "TimeupCallback was called"
timer = TestKitchenTimer()
# The timer thread will block when the timer expires, but before the callback
# is invoked.
thread_1 = threading.Thread(target = timer.start, args = (1, TimeupCallback))
thread_1.start()
sleep(2)
# The timer is now blocked. In the parent thread, we stop it.
timer.stop()
print "timer is stopped: %r" % timer.isStopped()
# Now allow the countdown thread to resume.
timer.resume()
</code></pre>
<p>对要测试的类进行子类化并不是一种很好的测试方法:为了测试每一个类中的竞争条件,基本上必须重写所有的方法,这时就有一个很好的论据可以证明你并没有真正测试原始代码。相反,您可能会发现将信号量正确地放在KitchenTimer对象中(但默认情况下初始化为None)会更干净,并在获取或等待锁之前让您的方法检查<code>if testRunningLock is not None:</code>。然后,可以强制对提交的实际代码进行竞争。</p>
<p>一些关于Python模拟框架的阅读可能会有帮助。实际上,我不确定mock是否有助于测试这段代码:它几乎完全是自包含的,不依赖于许多外部对象。但是模拟教程有时会涉及到这些问题。我没有使用过这些,但是这些文档是一个很好的开始:</p>
<ul>
<li><a href="http://www.voidspace.org.uk/python/mock/getting-started.html">Getting Started with Mock</a></li>
<li><a href="http://farmdev.com/projects/fudge/using-fudge.html">Using Fudge</a></li>
<li><a href="http://agiletesting.blogspot.com/2009/07/python-mock-testing-techniques-and.html">Python Mock Testing Techniques and Tools</a></li>
</ul>