如何在这个python代码中可靠地重现竞争条件?

2024-03-29 09:26:26 发布

您现在位置:Python中文网/ 问答频道 /正文

上下文

我最近发了一个timer class for review on Code Review。我曾有过一种直觉,觉得有并发错误,因为我曾经看到过一个单元测试失败,但无法重现失败。因此,我的代码后审查。

我得到了一些很好的反馈,强调了代码中的各种比赛条件。(我想)我理解这个问题和解决方案,但是在做任何修复之前,我想用单元测试来暴露错误。当我努力的时候,我意识到这很难。各种堆栈交换答案表明,我必须控制线程的执行以暴露错误,任何人为的计时都不一定能移植到不同的机器上。这似乎是我试图解决的问题之外的许多偶然的复杂性。

相反,我尝试使用the best static analysis (SA) tool for python,PyLint,看看它是否能找出任何错误,但它不能。为什么人类可以通过代码审查(本质上是SA)找到错误,但是SA工具却不能?

由于害怕尝试get Valgrind working with python(听起来像是在刮牦牛毛),我决定在不首先复制虫子的情况下大干一场。现在我陷入困境。

这是密码。

from threading import Timer, Lock
from time import time

class NotRunningError(Exception): pass
class AlreadyRunningError(Exception): pass


class KitchenTimer(object):
    '''
    Loosely models a clockwork kitchen timer with the following differences:
        You can start the timer with arbitrary duration (e.g. 1.2 seconds).
        The timer calls back a given function when time's up.
        Querying the time remaining has 0.1 second accuracy.
    '''

    PRECISION_NUM_DECIMAL_PLACES = 1
    RUNNING = "RUNNING"
    STOPPED = "STOPPED"
    TIMEUP  = "TIMEUP"

    def __init__(self):
        self._stateLock = Lock()
        with self._stateLock:
            self._state = self.STOPPED
            self._timeRemaining = 0

    def start(self, duration=1, whenTimeup=None):
        '''
        Starts the timer to count down from the given duration and call whenTimeup when time's up.
        '''
        with self._stateLock:
            if self.isRunning():
                raise AlreadyRunningError
            else:
                self._state = self.RUNNING
                self.duration = duration
                self._userWhenTimeup = whenTimeup
                self._startTime = time()
                self._timer = Timer(duration, self._whenTimeup)
                self._timer.start()

    def stop(self):
        '''
        Stops the timer, preventing whenTimeup callback.
        '''
        with self._stateLock:
            if self.isRunning():
                self._timer.cancel()
                self._state = self.STOPPED
                self._timeRemaining = self.duration - self._elapsedTime()
            else:
                raise NotRunningError()

    def isRunning(self):
        return self._state == self.RUNNING

    def isStopped(self):
        return self._state == self.STOPPED

    def isTimeup(self):
        return self._state == self.TIMEUP

    @property
    def timeRemaining(self):
        if self.isRunning():
            self._timeRemaining = self.duration - self._elapsedTime()
        return round(self._timeRemaining, self.PRECISION_NUM_DECIMAL_PLACES)

    def _whenTimeup(self):
        with self._stateLock:
            self._state = self.TIMEUP
            self._timeRemaining = 0
            if callable(self._userWhenTimeup):
                self._userWhenTimeup()

    def _elapsedTime(self):
        return time() - self._startTime

问题

在这个代码示例的上下文中,如何公开竞争条件,修复它们,并证明它们已修复?

加分

适合于其他实现和问题的测试框架的额外点,而不是专门针对此代码。

外卖

我的收获是,重现已识别的争用条件的技术解决方案是控制两个线程的同步性,以确保它们的执行顺序将暴露一个bug。这里重要的一点是,他们已经确定了比赛条件。我发现识别竞争条件的最好方法是将代码提交给代码评审,并鼓励更多的专家对其进行分析。


Tags: the代码selftimedef错误with条件
3条回答

你可以用很多线程来测试它:

import sys, random, thread
def timeup():
    sys.stdout.write("Timer:: Up %f" % time())

def trdfunc(kt, tid):
    while True :
        sleep(1)
        if not kt.isRunning():
            if kt.start(1, timeup):
                sys.stdout.write("[%d]: started\n" % tid)
        else:
            if random.random() < 0.1:
                kt.stop()
                sys.stdout.write("[%d]: stopped\n" % tid)
        sys.stdout.write("[%d] remains %f\n" % ( tid, kt.timeRemaining))

kt = KitchenTimer()
kt.start(1, timeup)
for i in range(1, 100):
    thread.start_new_thread ( trdfunc, (kt, i) )
trdfunc(kt, 0)

我看到了几个问题:

  • 当一个线程看到计时器没有运行并试图启动它时 代码通常会引发一个异常,原因是在 测试并启动。我觉得破例太多了。或者你可以 具有原子测试和启动功能

  • stop也会出现类似的问题。您可以实现testAndStop 功能。

  • 甚至这个来自timeRemaining函数的代码:

    if self.isRunning():
       self._timeRemaining = self.duration - self._elapsedTime()
    

    需要某种原子性,也许你需要在 测试正在运行。

如果计划在线程之间共享该类,则需要解决这些问题。

传统上,在多线程代码中强制争用条件是通过信号量完成的,因此可以强制一个线程等到另一个线程达到某个边缘条件后再继续。

例如,如果对象已经在运行,则对象有一些代码来检查是否未调用start。您可以通过执行以下操作来强制此条件以确保其行为符合预期:

  • 启动KitchenTimer
  • 在运行状态下在信号量上设置计时器块
  • 在另一个线程中启动同一个计时器
  • 捕捉AlreadyRunningError

要完成这些任务,您可能需要扩展KitchenTimer类。正式的单元测试通常使用在关键时刻被定义为阻塞的模拟对象。Mock对象是一个比我在这里要讨论的更大的主题,但是在google上搜索“python Mock对象”会找到很多文档和许多可供选择的实现。

有一种方法可以强制代码抛出AlreadyRunningError

import threading

class TestKitchenTimer(KitchenTimer):

    _runningLock = threading.Condition()

    def start(self, duration=1, whenTimeUp=None):
        KitchenTimer.start(self, duration, whenTimeUp)
        with self._runningLock:
            print "waiting on _runningLock"
            self._runningLock.wait()

    def resume(self):
        with self._runningLock:
            self._runningLock.notify()

timer = TestKitchenTimer()

# Start the timer in a subthread. This thread will block as soon as
# it is started.
thread_1 = threading.Thread(target = timer.start, args = (10, None))
thread_1.start()

# Attempt to start the timer in a second thread, causing it to throw
# an AlreadyRunningError.
try:
    thread_2 = threading.Thread(target = timer.start, args = (10, None))
    thread_2.start()
except AlreadyRunningError:
    print "AlreadyRunningError"
    timer.resume()
    timer.stop()

阅读代码,确定一些要测试的边界条件,然后考虑需要暂停计时器以强制出现该条件的位置,并添加条件、信号量、事件等以使其发生。e、 当计时器运行whenTimeUp回调时,如果另一个线程试图停止它,会发生什么情况?您可以通过使计时器在进入时立即等待来强制该条件:

import threading

class TestKitchenTimer(KitchenTimer):

    _runningLock = threading.Condition()

    def _whenTimeup(self):
        with self._runningLock:
            self._runningLock.wait()
        KitchenTimer._whenTimeup(self)

    def resume(self):
        with self._runningLock:
            self._runningLock.notify()

def TimeupCallback():
    print "TimeupCallback was called"

timer = TestKitchenTimer()

# The timer thread will block when the timer expires, but before the callback
# is invoked.
thread_1 = threading.Thread(target = timer.start, args = (1, TimeupCallback))
thread_1.start()
sleep(2)

# The timer is now blocked. In the parent thread, we stop it.
timer.stop()
print "timer is stopped: %r" % timer.isStopped()

# Now allow the countdown thread to resume.
timer.resume()

对要测试的类进行子类化并不是一种很好的测试方法:为了测试每一个类中的竞争条件,基本上必须重写所有的方法,这时就有一个很好的论据可以证明你并没有真正测试原始代码。相反,您可能会发现将信号量正确地放在KitchenTimer对象中(但默认情况下初始化为None)会更干净,并在获取或等待锁之前让您的方法检查if testRunningLock is not None:。然后,可以强制对提交的实际代码进行竞争。

一些关于Python模拟框架的阅读可能会有帮助。实际上,我不确定mock是否有助于测试这段代码:它几乎完全是自包含的,不依赖于许多外部对象。但是模拟教程有时会涉及到这些问题。我没有使用过这些,但是这些文档是一个很好的开始:

测试线程(un)安全代码的最常见的解决方案是启动许多线程并希望达到最佳效果。我和我可以想象其他人,都有一个问题,那就是它依赖于机会,它使测试变得“沉重”。

前一阵子我碰到这个问题时,我想追求精确而不是暴力。其结果是一段测试代码,通过让线程并驾齐驱来引起竞争条件。

赛马代码示例

spam = []

def set_spam():
    spam[:] = foo()
    use(spam)

如果从多个线程调用set_spam,则在修改和使用spam之间存在竞争条件。让我们试着始终如一地复制它。

如何造成比赛条件

class TriggeredThread(threading.Thread):
    def __init__(self, sequence=None, *args, **kwargs):
        self.sequence = sequence
        self.lock = threading.Condition()
        self.event = threading.Event()
        threading.Thread.__init__(self, *args, **kwargs)

    def __enter__(self):
        self.lock.acquire()
        while not self.event.is_set():
            self.lock.wait()
        self.event.clear()

    def __exit__(self, *args):
        self.lock.release()
        if self.sequence:
            next(self.sequence).trigger()

    def trigger(self):
        with self.lock:
            self.event.set()
            self.lock.notify()

然后演示此线程的使用:

spam = []  # Use a list to share values across threads.
results = []  # Register the results.

def set_spam():
    thread = threading.current_thread()
    with thread:  # Acquires the lock.
        # Set 'spam' to thread name
        spam[:] = [thread.name]
    # Thread 'releases' the lock upon exiting the context.
    # The next thread is triggered and this thread waits for a trigger.
    with thread:
        # Since each thread overwrites the content of the 'spam'
        # list, this should only result in True for the last thread.
        results.append(spam == [thread.name])

threads = [
    TriggeredThread(name='a', target=set_spam),
    TriggeredThread(name='b', target=set_spam),
    TriggeredThread(name='c', target=set_spam)]

# Create a shifted sequence of threads and share it among the threads.
thread_sequence = itertools.cycle(threads[1:] + threads[:1])
for thread in threads:
    thread.sequence = thread_sequence

# Start each thread
[thread.start() for thread in threads]
# Trigger first thread.
# That thread will trigger the next thread, and so on.
threads[0].trigger()
# Wait for each thread to finish.
[thread.join() for thread in threads]
# The last thread 'has won the race' overwriting the value
# for 'spam', thus [False, False, True].
# If set_spam were thread-safe, all results would be true.
assert results == [False, False, True], "race condition triggered"
assert results == [True, True, True], "code is thread-safe"

我想我对这个结构解释得够多了,所以你可以根据自己的情况来实现它。我认为这很适合“额外积分”部分:

extra points for a testing framework suitable for other implementations and problems rather than specifically to this code.

解决比赛条件

共享变量

每一个线程问题都以它自己的特定方式解决。在上面的例子中,我通过在线程之间共享一个值导致了一个竞争条件。使用全局变量(如模块属性)时可能会出现类似的问题。解决这些问题的关键可能是使用线程本地存储:

# The thread local storage is a global.
# This may seem weird at first, but it isn't actually shared among threads.
data = threading.local()
data.spam = []  # This list only exists in this thread.
results = []  # Results *are* shared though.

def set_spam():
    thread = threading.current_thread()
    # 'get' or set the 'spam' list. This actually creates a new list.
    # If the list was shared among threads this would cause a race-condition.
    data.spam = getattr(data, 'spam', [])
    with thread:
        data.spam[:] = [thread.name]
    with thread:
        results.append(data.spam == [thread.name])

# Start the threads as in the example above.

assert all(results)  # All results should be True.

并发读/写

一个常见的线程问题是多个线程同时读取和/或写入数据保持器的问题。这个问题是通过实现读写锁来解决的。读写锁的实际实现可能不同。您可以选择读优先锁、写优先锁或随机锁。

我相信有一些例子描述了这种锁定技术。我可以稍后再写一个例子,因为这已经是一个很长的答案了。;-)

注释

看一看the threading module documentation并用它做一些实验。由于每个线程问题都不同,因此应用不同的解决方案。

在谈到线程时,请看一下Python GIL(全局解释器锁)。需要注意的是,线程可能不是优化性能的最佳方法(但这不是您的目标)。我觉得这个演示很不错:https://www.youtube.com/watch?v=zEaosS1U5qY

相关问题 更多 >