需要验证:可清除的Python队列类

2 投票
2 回答
752 浏览
提问于 2025-04-16 13:56

因为我对Python和多线程编程不是很专业,所以想请教一下我的实现是否正确。

我的目标是扩展Queue类,让它可以被清空。而且被移除的项目应该能够返回。这就是我的实现:

import Queue

class ClearableQueue(Queue.Queue):

    def __init__(self, maxsize):
        Queue.Queue.__init__(self, maxsize)

    def clear(self):
        self.mutex.acquire()

        copyOfRemovedEntries = list(self.queue)
        self.queue.clear()
        self.unfinished_tasks = 0
        self.all_tasks_done.notifyAll()
        self.not_full.notifyAll()

        self.mutex.release()

        return copyOfRemovedEntries

这样做对吗?谢谢。

更新:不幸的是,这个实现还是不够完善,因为在调用clear()之后,task_done可能会抛出ValueError异常。

更具体地说:这个队列是为了在多线程环境中使用的。假设有一个生产者线程和一个工作线程(当然你也可以考虑更多线程)。通常情况下,如果工作线程调用get(),那么在工作完成后应该调用task_done()。如果是这样的话,可能会发生这样的情况:生产者线程在工作线程调用get()之后、task_done()之前出于某种原因调用了clear()。到目前为止,这样是可以的,但是如果工作线程想要调用task_done(),就会抛出异常。这是因为task_done()会通过检查Queue类的unfinished_tasks来查看还有多少未完成的任务。

如果这个问题能够仅由ClearableQueue类来处理,那就很有意思了,这样就可以放心地调用clear()方法。或者是否需要其他东西来控制方法调用。

实际上,在我的具体情况下,我并没有使用join()方法,所以我不需要调用task_done()。不过,我想让这个功能更完整,这样对其他人也可能有用。

2 个回答

3

如果你查看这个源代码,你会发现,标准的访问方式是把可能会改变数据的代码放在一个try: finally的结构里,这样如果出现问题也能处理。

import Queue

class ClearableQueue(Queue.Queue):

    def __init__(self, maxsize):
        Queue.Queue.__init__(self, maxsize)

    def clear(self):
        self.mutex.acquire()

        copyOfRemovedEntries = None
        try:
            copyOfRemovedEntries = list(self.queue)
            self.queue.clear()
            self.unfinished_tasks = 0
            self.all_tasks_done.notifyAll()
            self.not_full.notifyAll()
        finally:
            self.mutex.release()

        return copyOfRemovedEntries

编辑 1

如果你担心在调用get()task_done()时,第二个线程会抛出异常,那为什么不把task_done()放在一个try-catch的结构里呢?这个异常只是告诉你,你已经确认了太多的项目,但如果你的清理函数已经处理过这些项目,那问题出在哪里呢?

这样做可以隐藏那个异常,如果它让你烦恼的话,同时也让函数的意图更加明确,还能去掉我之前例子中的重复列表赋值。

class ClearableQueue(Queue.Queue):

    def __init__(self, maxsize):
        Queue.Queue.__init__(self, maxsize)

    def get_all(self)
        self.mutex.acquire()

        try:
            copyOfRemovedEntries = list(self.queue)
            self.queue.clear()
            self.unfinished_tasks = 0
            self.all_tasks_done.notifyAll()
            self.not_full.notifyAll()
        finally:
            self.mutex.release()

        return copyOfRemovedEntries

    def clear(self):
        self.get_all()

    def task_done(self):
        try:
            Queue.Queue.task_done(self)
        except ValueError:
            pass

编辑 2

那这样做是不是一个更有效的解决方案呢?它不会隐藏任何东西:

class ClearableQueue(Queue.Queue):

    def __init__(self, maxsize):
        Queue.Queue.__init__(self, maxsize)
        self.tasks_cleared = 0

    def get_all(self)
        self.mutex.acquire()

        try:
            copyOfRemovedEntries = list(self.queue)
            self.queue.clear()
            self.unfinished_tasks = 0
            self.all_tasks_done.notifyAll()
            self.not_full.notifyAll()
            self.tasks_cleared += len(copyOfRemovedEntries)
        finally:
            self.mutex.release()

        return copyOfRemovedEntries

    def clear(self):
        self.get_all()

    def task_done(self):
        self.all_tasks_done.acquire()
        try:
            unfinished = self.unfinished_tasks + self.tasks_cleared - 1
            if unfinished <= 0:
                if unfinished < 0:
                    raise ValueError('task_done() called too many times')
                self.all_tasks_done.notify_all()
            self.unfinished_tasks = unfinished - self.tasks_cleared
            self.tasks_cleared = 0
        finally:
            self.all_tasks_done.release() 

我认为这样做应该可以避免异常,同时仍然保持原来类的预期行为。

1

你似乎遇到了一种竞争条件的问题。如果我理解得没错,现在的情况是你有时会遇到:

T1: |----->|------------->|-------------->|
    | get  |    some_opp  | task_done     |
T2: |---------->|------>|---------------->|
    | other_opp | clear | yet_another_opp |

在这个情况下,gettask_done中都进行了清除操作,这会导致程序崩溃。根据我的理解,你需要找到一种方法来解决这个问题:

T1: |----->|------------->|-------------->|
    | get  |    some_opp  | task_done     |
T2: |---------->|------------------------>|------>|
    | other_opp | wait_for_task_done      | clear |

如果我说得对,你可能需要一个第二把锁,由get设置,由task_done释放,这样就可以表示“这个队列不能被清除”。然后,你可能还需要有一个不执行清除操作的gettask_done版本,用于那些你非常确定自己在做什么的特殊情况。

另一种解决方案是使用一个更原子化的锁,这样你就可以这样做:

T1: |----->|------------------->|-------------->|------------->|
    | get  |    some_opp        | task_done     | finish_clear |
T2: |---------->|-------------->|---------------->|
    | other_opp | partial_clear | yet_another_opp |

在这里,你可以说“我还没有完成这个任务,但你可以清除其他的”,然后告诉task_done这个任务尝试过被清除,所以它应该在之后做点什么。不过,这样的做法开始变得相当复杂了。

撰写回答