需要验证:可清除的Python队列类
因为我对Python和多线程编程不是很专业,所以想请教一下我的实现是否正确。
我的目标是扩展Queue类,让它可以被清空。而且被移除的项目应该能够返回。这就是我的实现:
import Queue
class ClearableQueue(Queue.Queue):
def __init__(self, maxsize):
Queue.Queue.__init__(self, maxsize)
def clear(self):
self.mutex.acquire()
copyOfRemovedEntries = list(self.queue)
self.queue.clear()
self.unfinished_tasks = 0
self.all_tasks_done.notifyAll()
self.not_full.notifyAll()
self.mutex.release()
return copyOfRemovedEntries
这样做对吗?谢谢。
更新:不幸的是,这个实现还是不够完善,因为在调用clear()之后,task_done可能会抛出ValueError异常。
更具体地说:这个队列是为了在多线程环境中使用的。假设有一个生产者线程和一个工作线程(当然你也可以考虑更多线程)。通常情况下,如果工作线程调用get(),那么在工作完成后应该调用task_done()。如果是这样的话,可能会发生这样的情况:生产者线程在工作线程调用get()之后、task_done()之前出于某种原因调用了clear()。到目前为止,这样是可以的,但是如果工作线程想要调用task_done(),就会抛出异常。这是因为task_done()会通过检查Queue类的unfinished_tasks来查看还有多少未完成的任务。
如果这个问题能够仅由ClearableQueue类来处理,那就很有意思了,这样就可以放心地调用clear()方法。或者是否需要其他东西来控制方法调用。
实际上,在我的具体情况下,我并没有使用join()方法,所以我不需要调用task_done()。不过,我想让这个功能更完整,这样对其他人也可能有用。
2 个回答
如果你查看这个源代码,你会发现,标准的访问方式是把可能会改变数据的代码放在一个try: finally的结构里,这样如果出现问题也能处理。
import Queue
class ClearableQueue(Queue.Queue):
def __init__(self, maxsize):
Queue.Queue.__init__(self, maxsize)
def clear(self):
self.mutex.acquire()
copyOfRemovedEntries = None
try:
copyOfRemovedEntries = list(self.queue)
self.queue.clear()
self.unfinished_tasks = 0
self.all_tasks_done.notifyAll()
self.not_full.notifyAll()
finally:
self.mutex.release()
return copyOfRemovedEntries
编辑 1
如果你担心在调用get()
和task_done()
时,第二个线程会抛出异常,那为什么不把task_done()
放在一个try-catch的结构里呢?这个异常只是告诉你,你已经确认了太多的项目,但如果你的清理函数已经处理过这些项目,那问题出在哪里呢?
这样做可以隐藏那个异常,如果它让你烦恼的话,同时也让函数的意图更加明确,还能去掉我之前例子中的重复列表赋值。
class ClearableQueue(Queue.Queue):
def __init__(self, maxsize):
Queue.Queue.__init__(self, maxsize)
def get_all(self)
self.mutex.acquire()
try:
copyOfRemovedEntries = list(self.queue)
self.queue.clear()
self.unfinished_tasks = 0
self.all_tasks_done.notifyAll()
self.not_full.notifyAll()
finally:
self.mutex.release()
return copyOfRemovedEntries
def clear(self):
self.get_all()
def task_done(self):
try:
Queue.Queue.task_done(self)
except ValueError:
pass
编辑 2
那这样做是不是一个更有效的解决方案呢?它不会隐藏任何东西:
class ClearableQueue(Queue.Queue):
def __init__(self, maxsize):
Queue.Queue.__init__(self, maxsize)
self.tasks_cleared = 0
def get_all(self)
self.mutex.acquire()
try:
copyOfRemovedEntries = list(self.queue)
self.queue.clear()
self.unfinished_tasks = 0
self.all_tasks_done.notifyAll()
self.not_full.notifyAll()
self.tasks_cleared += len(copyOfRemovedEntries)
finally:
self.mutex.release()
return copyOfRemovedEntries
def clear(self):
self.get_all()
def task_done(self):
self.all_tasks_done.acquire()
try:
unfinished = self.unfinished_tasks + self.tasks_cleared - 1
if unfinished <= 0:
if unfinished < 0:
raise ValueError('task_done() called too many times')
self.all_tasks_done.notify_all()
self.unfinished_tasks = unfinished - self.tasks_cleared
self.tasks_cleared = 0
finally:
self.all_tasks_done.release()
我认为这样做应该可以避免异常,同时仍然保持原来类的预期行为。
你似乎遇到了一种竞争条件的问题。如果我理解得没错,现在的情况是你有时会遇到:
T1: |----->|------------->|-------------->|
| get | some_opp | task_done |
T2: |---------->|------>|---------------->|
| other_opp | clear | yet_another_opp |
在这个情况下,get
和task_done
中都进行了清除操作,这会导致程序崩溃。根据我的理解,你需要找到一种方法来解决这个问题:
T1: |----->|------------->|-------------->|
| get | some_opp | task_done |
T2: |---------->|------------------------>|------>|
| other_opp | wait_for_task_done | clear |
如果我说得对,你可能需要一个第二把锁,由get
设置,由task_done
释放,这样就可以表示“这个队列不能被清除”。然后,你可能还需要有一个不执行清除操作的get
和task_done
版本,用于那些你非常确定自己在做什么的特殊情况。
另一种解决方案是使用一个更原子化的锁,这样你就可以这样做:
T1: |----->|------------------->|-------------->|------------->|
| get | some_opp | task_done | finish_clear |
T2: |---------->|-------------->|---------------->|
| other_opp | partial_clear | yet_another_opp |
在这里,你可以说“我还没有完成这个任务,但你可以清除其他的”,然后告诉task_done
这个任务尝试过被清除,所以它应该在之后做点什么。不过,这样的做法开始变得相当复杂了。