Python 线程:我遗漏了什么?(task_done() 调用次数过多)

3 投票
2 回答
10670 浏览
提问于 2025-04-17 02:25

抱歉一开始发了这么长的内容。希望这些信息能帮助找到解决办法。我试着写了一个工具函数,可以把任意数量的旧的 classmethod 放进一个多线程的队列里:

class QueuedCall(threading.Thread):

    def __init__(self, name, queue, fn, args, cb):
        threading.Thread.__init__(self)
        self.name = name

        self._cb = cb
        self._fn = fn
        self._queue = queue
        self._args = args

        self.daemon = True
        self.start()

    def run(self):
        r = self._fn(*self._args) if self._args is not None \
            else self._fn()

        if self._cb is not None:
            self._cb(self.name, r)

            self._queue.task_done()

这是我在一个类里调用的代码:

data = {}
def __op_complete(name, r):
    data[name] = r

q = Queue.Queue()

socket.setdefaulttimeout(5)

q.put(QueuedCall('twitter', q, Twitter.get_status, [5,], __op_complete))
q.put(QueuedCall('so_answers', q, StackExchange.get_answers,
    ['api.stackoverflow.com', 534476, 5], __op_complete))
q.put(QueuedCall('so_user', q, StackExchange.get_user_info,
    ['api.stackoverflow.com', 534476], __op_complete))
q.put(QueuedCall('p_answers', q, StackExchange.get_answers,
    ['api.programmers.stackexchange.com', 23901, 5], __op_complete))
q.put(QueuedCall('p_user', q, StackExchange.get_user_info,
    ['api.programmers.stackexchange.com', 23901], __op_complete))
q.put(QueuedCall('fb_image', q, Facebook.get_latest_picture, None, __op_complete))

q.join()
return data

我遇到的问题是,这个代码在服务器刚重启的时候总是能正常工作,但在每第二或第三次请求时就会出错,错误信息是:

ValueError: task_done() called too many times

这个错误会在随机的线程中出现,每第二或第三次请求就会发生,所以很难准确找出问题出在哪里。

有没有人有什么想法或建议呢?

谢谢。


补充:

我为了调试这个问题加了一些 print 语句(比较简单粗暴,不是用日志记录)。在 run 的第一行加了一个打印语句 (print 'running thread: %s' % self.name),还有在调用 task_done() 之前又加了一个 (print 'thread done: %s' % self.name)。

成功请求的输出:

running thread: twitter
running thread: so_answers
running thread: so_user
running thread: p_answers
thread done: twitter
thread done: so_user
running thread: p_user
thread done: so_answers
running thread: fb_image
thread done: p_answers
thread done: p_user
thread done: fb_image

失败请求的输出:

running thread: twitter
running thread: so_answers
thread done: twitter
thread done: so_answers
running thread: so_user
thread done: so_user
running thread: p_answers
thread done: p_answers
Exception in thread p_answers:
Traceback (most recent call last):
  File "/usr/lib/python2.7/threading.py", line 552, in __bootstrap_inner
    self.run()
  File "/home/demian/src/www/projects/demianbrecht/demianbrecht/demianbrecht/helpers.py", line 37, in run
    self._queue.task_done()
  File "/usr/lib/python2.7/Queue.py", line 64, in task_done
    raise ValueError('task_done() called too many times')
ValueError: task_done() called too many times

running thread: p_user
thread done: p_user
running thread: fb_image
thread done: fb_image

2 个回答

3

我可能理解错了,但我觉得你可能没有正确使用Queue

根据我简单浏览文档的结果,Queue的用法是这样的:你可以用put方法把任务放进Queue里,然后另一个线程可以调用get从队列中取出任务,完成任务后再调用task_done表示任务完成。

但你的代码似乎是在把QueuedCall的实例放进队列里。队列里从来没有被get过,但QueuedCall的实例还会获得一个指向它们被放入的队列的引用,然后它们就开始做自己的工作(这部分工作是它们自己知道的,而不是从队列中get到的),最后调用task_done

如果我理解的没错(而且你没有在其他地方调用get方法),那么我想我明白问题所在了。

问题在于,QueuedCall的实例必须在放入队列之前创建,而创建一个实例会在另一个线程中开始它的工作。如果这个线程在主线程还没来得及把QueuedCall放入队列之前就完成了工作并调用了task_done,那么你就会看到那个错误。

我觉得这只有在你第一次运行时偶然发生。全局解释锁(GIL)在这里“帮了你”的忙;QueuedCall线程不太可能立即获得GIL并开始运行。你其实并不在乎队列,只把它当作计数器,这也让它看起来能正常工作:只要队列不为空,QueuedCall是否已经放入队列并不重要(这个QueuedCall可以直接task_done队列中的另一个元素,而等到那个元素调用task_done时,希望这个QueuedCall已经在队列里了,这样就能被标记为完成)。而且添加sleep也让新线程等一会儿,给主线程留出时间确保它们确实在队列里,这也是为什么这个问题被掩盖了。

另外,根据我在交互式命令行中快速试验的结果,你的队列在最后实际上还是满的,因为你从来没有get出任何东西。它只是收到了和放入数量一样多的task_done消息,所以join能正常工作。

我觉得你需要彻底重新设计QueuedCall类的工作方式,或者使用不同的同步机制,而不是QueueQueue是为了让已经存在的工作线程排队工作而设计的。在构造一个放入队列的对象时启动一个线程并不是一个好的选择。

7

你对这个问题的处理方式有点“特别”。不过先不谈这个……问题的关键在于你给出的代码

q.put(QueuedCall('twitter', q, Twitter.get_status, [5,], __op_complete))

里面很明显可能会出现以下的工作流程:

  1. 一个线程是由 QueuedCall.__init__ 创建并启动的。
  2. 然后它被放入队列 q 中。但是……在队列完成插入这个项目的逻辑之前,这个独立的线程已经完成了它的工作,并试图调用 q.task_done()。这就导致了你遇到的错误(在对象安全放入队列之前就调用了 task_done())。

那么应该怎么做呢?你不应该把线程放入队列。队列是用来存放线程要处理的数据的。所以你应该:

  • 创建一个队列。把你想要完成的任务放进去(比如函数、它们需要的参数和回调)。
  • 创建并启动工作线程。
  • 一个工作线程会调用:
    • q.get() 来获取要执行的函数。
    • 执行这个函数。
    • 调用 q.task_done() 来告诉队列这个项目已经处理完了。

撰写回答