Python关闭多个线程

0 投票
1 回答
910 浏览
提问于 2025-04-17 10:02

我在使用IBM网站上的一个例子。发现DatamineThread()和ThreadUrl()这两个线程一直处于打开状态,因为它们里面有个while循环。

我想结束这些线程,并打印一些文字来告诉我它们已经结束了。我不确定我这样做是否正确,或者这些线程是否真的需要这样结束。问题是,当我在main()函数中把run设置为False时,while循环却还是在读取run为True。

如果能帮我就太好了...谢谢

import Queue
import threading
import urllib2
import time
from BeautifulSoup import BeautifulSoup

hosts = ["http://yahoo.com", "http://google.com", "http://amazon.com",
        "http://ibm.com", "http://apple.com"]

queue = Queue.Queue()
out_queue = Queue.Queue()
run = True

class ThreadUrl(threading.Thread):
    """Threaded Url Grab"""
    def __init__(self, queue, out_queue):
        threading.Thread.__init__(self)
        self.queue = queue
        self.out_queue = out_queue

    def run(self):
        global run
        while run:
            #grabs host from queue
            host = self.queue.get()

            #grabs urls of hosts and then grabs chunk of webpage
            url = urllib2.urlopen(host)
            chunk = url.read()

            #place chunk into out queue
            self.out_queue.put(chunk)

            #signals to queue job is done
            self.queue.task_done()

        print 'ThreadUrl finished...'


class DatamineThread(threading.Thread):
    """Threaded Url Grab"""
    def __init__(self, out_queue):
        threading.Thread.__init__(self)
        self.out_queue = out_queue

    def run(self):
        global run
        while run:
            #grabs host from queue
            chunk = self.out_queue.get()

            #parse the chunk
            soup = BeautifulSoup(chunk)
            print soup.findAll(['title'])

            #signals to queue job is done
            self.out_queue.task_done()

        print 'DatamineThread finished...'

start = time.time()
def main():
    global run
    #spawn a pool of threads, and pass them queue instance
    for i in range(5):
        t = ThreadUrl(queue, out_queue)
        t.setDaemon(True)
        t.start()

    #populate queue with data
    for host in hosts:
        queue.put(host)

    for i in range(5):
        dt = DatamineThread(out_queue)
        dt.setDaemon(True)
        dt.start()


    #wait on the queue until everything has been processed
    queue.join()
    out_queue.join()

    # try and break while-loops in threads
    run = False

    time.sleep(5)


main()
print "Elapsed Time: %s" % (time.time() - start)

1 个回答

4

我个人不是很喜欢使用全局变量来处理线程的状态,主要是因为我之前见过你遇到的问题。这个问题的原因可以从Python文档中的Queue.get方法找到。

如果可选参数block为真,并且timeout为None(默认值),那么在有项目可用之前,会一直阻塞。

简单来说,你没有看到对while run:的第二次检查,因为out_queue.get()在队列空的时候会一直阻塞,导致程序停在那里。

我认为更好的做法是使用哨兵值(sentinel values)在队列中,或者使用get_nowait方法并捕获异常来打破循环。下面是一些例子:

哨兵值

class DatamineThread(threading.Thread):
    def run(self):
        while True:
            data = self.out_queue.get()
            if data == "time to quit": break
            # non-sentinel processing here.

尝试 / 捕获异常

class DatamineThread(threading.Thread):
    def run(self):
        while True:
            try:
                data = self.out_queue.get_nowait() # also, out_queue.get(False)
            except Queue.Empty: break
            # data processing here.

为了确保所有线程都能结束,可以有几种方法:

为每个工作线程添加哨兵值

for i in range(numWorkers):
  out_queue.put('time to quit')

out_queue.join()

替换哨兵值

class DatamineThread(threading.Thread):
    def run(self):
        while True:
            data = self.out_queue.get()
            if data == "time to quit": 
                self.out_queue.put('time to quit')
                break
            # non-sentinel processing here.

这两种方法都可以工作。选择哪种方法取决于你的out_queue是如何填充的。如果工作线程可以添加或移除队列中的项目,第一种方法更好。你可以先调用join(),然后添加哨兵值,再次调用join()。第二种方法适合你不想记住创建了多少个工作线程的情况——它只使用一个哨兵值,不会让队列变得复杂。

撰写回答