在Python 2.6中使用队列类

3 投票
2 回答
2723 浏览
提问于 2025-04-15 21:40

假设我现在只能用Python 2.6,没法升级(即使升级会有帮助)。我写了一个程序,里面用到了队列(Queue)这个类。我的生产者是一个简单的目录列表,而消费者线程则从队列中取出文件,然后对这些文件进行处理。如果文件已经处理过了,我就跳过它。处理过的文件列表是在所有线程启动之前生成的,所以这个列表不是空的。

下面是一些伪代码。

import Queue, sys, threading

processed = []

def consumer():
    while True:
        file = dirlist.get(block=True)
        if file in processed:
            print "Ignoring %s" % file
        else:
            # do stuff here
        dirlist.task_done()

dirlist = Queue.Queue()

for f in os.listdir("/some/dir"):
    dirlist.put(f)

max_threads = 8

for i in range(max_threads):
    thr = Thread(target=consumer)
    thr.start()

dirlist.join()

我遇到的奇怪情况是,如果某个线程碰到一个已经处理过的文件,这个线程就会停下来,等到整个程序结束才继续。经过一些测试,我发现前7个线程(假设最多只能有8个线程)都停了,而第8个线程则一个一个地继续处理文件。但是这样一来,我就失去了使用多线程的意义。

我是不是做错了什么,还是说在Python 2.6中,队列和线程的行为就是这样?

2 个回答

2

我试着运行了你的代码,但没有看到你描述的那种情况。不过,程序一直没有结束。我建议你把 .get() 的调用改成这样:

    try:
        file = dirlist.get(True, 1)
    except Queue.Empty:
        return

如果你想知道当前哪个线程在执行,可以导入 thread 模块,然后打印 thread.get_ident()

我在 .get() 后面加了以下这一行:

    print file, thread.get_ident()

然后得到了以下输出:

bin 7116328
cygdrive 7116328
 cygwin.bat 7149424
cygwin.ico 7116328
 dev etc7598568
7149424
 fix 7331000
 home 7116328lib
 7598568sbin
 7149424Thumbs.db
 7331000
tmp 7107008
 usr 7116328
var 7598568proc
 7441800

输出看起来有点乱,因为多个线程同时在写入标准输出。不同的线程标识符进一步确认了所有线程都在运行。

也许你的实际代码或者测试方法有问题,但你发的这段代码应该没问题吧?

1

因为这个问题只在寻找已经处理过的文件时出现,所以看起来和 processed 列表本身有关。你有没有尝试过实现一个简单的锁?比如:

processed = []
processed_lock = threading.Lock()

def consumer():
    while True:
        with processed_lock.acquire():
            fileInList = file in processed
        if fileInList:
            # ... et cetera

多线程往往会导致一些奇怪的错误,即使这些错误看起来“应该”不会发生。对共享变量使用锁是确保你不会遇到某种竞争条件的第一步,这种情况可能会导致线程死锁。


当然,如果你在 # do stuff here 下做的事情是占用 CPU 资源的,那么由于全局解释器锁的原因,Python 实际上一次只能运行一个线程的代码。在这种情况下,你可能想要切换到 multiprocessing 模块——它和 threading 非常相似,不过你需要用其他方法来处理共享变量(详细信息可以查看 这里)。

撰写回答