<p>测试线程(un)安全代码的最常见的解决方案是启动许多线程并希望达到最佳效果。我和我可以想象其他人,都有一个问题,那就是它依赖于机会,它使测试变得“沉重”。</p>
<p>前一阵子我碰到这个问题时,我想追求精确而不是暴力。其结果是一段测试代码,通过让线程并驾齐驱来引起竞争条件。</p>
<h2>赛马代码示例</h2>
<pre><code>spam = []
def set_spam():
spam[:] = foo()
use(spam)
</code></pre>
<p>如果从多个线程调用<code>set_spam</code>,则在修改和使用<code>spam</code>之间存在竞争条件。让我们试着始终如一地复制它。</p>
<h2>如何造成比赛条件</h2>
<pre><code>class TriggeredThread(threading.Thread):
def __init__(self, sequence=None, *args, **kwargs):
self.sequence = sequence
self.lock = threading.Condition()
self.event = threading.Event()
threading.Thread.__init__(self, *args, **kwargs)
def __enter__(self):
self.lock.acquire()
while not self.event.is_set():
self.lock.wait()
self.event.clear()
def __exit__(self, *args):
self.lock.release()
if self.sequence:
next(self.sequence).trigger()
def trigger(self):
with self.lock:
self.event.set()
self.lock.notify()
</code></pre>
<p>然后演示此线程的使用:</p>
<pre><code>spam = [] # Use a list to share values across threads.
results = [] # Register the results.
def set_spam():
thread = threading.current_thread()
with thread: # Acquires the lock.
# Set 'spam' to thread name
spam[:] = [thread.name]
# Thread 'releases' the lock upon exiting the context.
# The next thread is triggered and this thread waits for a trigger.
with thread:
# Since each thread overwrites the content of the 'spam'
# list, this should only result in True for the last thread.
results.append(spam == [thread.name])
threads = [
TriggeredThread(name='a', target=set_spam),
TriggeredThread(name='b', target=set_spam),
TriggeredThread(name='c', target=set_spam)]
# Create a shifted sequence of threads and share it among the threads.
thread_sequence = itertools.cycle(threads[1:] + threads[:1])
for thread in threads:
thread.sequence = thread_sequence
# Start each thread
[thread.start() for thread in threads]
# Trigger first thread.
# That thread will trigger the next thread, and so on.
threads[0].trigger()
# Wait for each thread to finish.
[thread.join() for thread in threads]
# The last thread 'has won the race' overwriting the value
# for 'spam', thus [False, False, True].
# If set_spam were thread-safe, all results would be true.
assert results == [False, False, True], "race condition triggered"
assert results == [True, True, True], "code is thread-safe"
</code></pre>
<p>我想我对这个结构解释得够多了,所以你可以根据自己的情况来实现它。我认为这很适合“额外积分”部分:</p>
<blockquote>
<p>extra points for a testing framework suitable for other implementations and problems rather than specifically to this code.</p>
</blockquote>
<h2>解决比赛条件</h2>
<h2>共享变量</h2>
<p>每一个线程问题都以它自己的特定方式解决。在上面的例子中,我通过在线程之间共享一个值导致了一个竞争条件。使用全局变量(如模块属性)时可能会出现类似的问题。解决这些问题的关键可能是使用线程本地存储:</p>
<pre><code># The thread local storage is a global.
# This may seem weird at first, but it isn't actually shared among threads.
data = threading.local()
data.spam = [] # This list only exists in this thread.
results = [] # Results *are* shared though.
def set_spam():
thread = threading.current_thread()
# 'get' or set the 'spam' list. This actually creates a new list.
# If the list was shared among threads this would cause a race-condition.
data.spam = getattr(data, 'spam', [])
with thread:
data.spam[:] = [thread.name]
with thread:
results.append(data.spam == [thread.name])
# Start the threads as in the example above.
assert all(results) # All results should be True.
</code></pre>
<h2>并发读/写</h2>
<p>一个常见的线程问题是多个线程同时读取和/或写入数据保持器的问题。这个问题是通过实现读写锁来解决的。读写锁的实际实现可能不同。您可以选择读优先锁、写优先锁或随机锁。</p>
<p>我相信有一些例子描述了这种锁定技术。我可以稍后再写一个例子,因为这已经是一个很长的答案了。;-)</p>
<h2>注释</h2>
<p>看一看<a href="http://docs.python.org/2/library/threading.html" rel="noreferrer">the threading module documentation</a>并用它做一些实验。由于每个线程问题都不同,因此应用不同的解决方案。</p>
<p>在谈到线程时,请看一下Python GIL(全局解释器锁)。需要注意的是,线程可能不是优化性能的最佳方法(但这不是您的目标)。我觉得这个演示很不错:<a href="https://www.youtube.com/watch?v=zEaosS1U5qY" rel="noreferrer">https://www.youtube.com/watch?v=zEaosS1U5qY</a></p>