Python 线程/队列问题

2 投票
2 回答
5360 浏览
提问于 2025-04-15 19:29

我正在写一个多线程的Python脚本,这个脚本会把一堆文件放进一个队列里,然后启动几个线程(默认是3个)来下载这些文件。每当一个线程完成下载时,它会更新控制台显示的队列状态和下载进度百分比。所有文件都在下载,但第三个线程的状态信息显示得不对,我不太明白为什么。 我在考虑创建一个工作完成的队列来计算进度,但我觉得这样做可能没必要,也不一定会有帮助。有人能给我一些建议吗?

download_queue = queue.Queue()

class Downloader(threading.Thread):
    def __init__(self,work_queue):
        super().__init__()
        self.current_job = 0
        self.work_queue = work_queue
        self.queue_size = work_queue.qsize()

    def run(self):
        while self.work_queue.qsize() > 0:
            url = self.work_queue.get(True)
            system_call = "wget -nc -q {0} -O {1}".format(url,local_file)
            os.system(system_call)
            self.current_job = int(self.queue_size) - int(self.work_queue.qsize())
            self.percent = (self.current_job / self.queue_size) * 100
            sys.stdout.flush()
            status = "\rDownloading " + url.split('/')[-1] + " [status: " + str(self.current_job) + "/" + str(self.queue_size) + ", " + str(round(self.percent,2)) + "%]"
        finally:
            self.work_queue.task_done()
def main:
    if download_queue.qsize() > 0:
        if options.active_downloads:
            active_downloads = options.active_downloads
        else:
            active_downloads = 3
        for x in range(active_downloads):
            downloader = Downloader(download_queue)
            downloader.start()
        download_queue.join()

2 个回答

2

如果你想使用 multiprocessing 这个模块,它里面有一个很不错的并行处理功能 imap_unordered,这样可以让你的问题变得非常简单优雅:

import multiprocessing, sys

class ParallelDownload:
    def __init__(self, urls, processcount=3):
        self.total_items = len(urls)
        self.pool = multiprocessing.Pool(processcount)
        for n, status in enumerate(self.pool.imap_unordered(self.download, urls)):
            stats = (n, self.total_items, n/self.total_items)
            sys.stdout.write(status + " [%d/%d = %0.2f %%]\n"%stats)


    def download(self, url):
        system_call = "wget -nc -q {0} -O {1}".format(url, local_file)
        os.system(system_call)
        status = "\rDownloaded " + url.split('/')[-1]
        return status
4

你不能先检查队列的大小,然后再从队列中取出一个元素。因为在这段时间里,队列的状态可能已经发生了变化。调用 .get() 方法就是你需要的唯一原子操作。如果它抛出 Empty 异常或者阻塞,那么说明队列是空的。

你的线程可能会互相覆盖输出。我建议你再创建一个线程,专门负责从输入队列中打印内容到标准输出。这个线程还可以统计完成的项目数量,并提供状态信息。

我通常不去继承 Thread 类,而是直接使用一个普通的 Thread 实例,给它传入 target= 参数,然后调用 .start() 来启动线程。

根据你的反馈,可以试试这个:

download_queue = queue.Queue()


class Downloader(threading.Thread):
    def __init__(self,work_queue, original_size):
        super().__init__()
        self.current_job = 0
        self.work_queue = work_queue
        self.queue_size = original_size

    def run(self):
        while True:
            try:
                url = self.work_queue.get(False)
                system_call = "wget -nc -q {0} -O {1}".format(url,local_file)
                os.system(system_call)
                # the following code is questionable. By the time we get here,
                #   many other items may have been taken off the queue. 
                self.current_job = int(self.queue_size) - int(self.work_queue.qsize())
                self.percent = (self.current_job / self.queue_size) * 100
                sys.stdout.flush()
                status = ("\rDownloading " + url.split('/')[-1] + 
                          " [status: " + str(self.current_job) + 
                          "/" + str(self.queue_size) + ", " + 
                          str(round(self.percent,2)) + "%]" )            
            except queue.Empty:
                pass
            finally: 
                self.work_queue.task_done()




def main:
    if download_queue.qsize() > 0:
        original_size = download_queue.qsize()
        if options.active_downloads:
            active_downloads = options.active_downloads
        else:
            active_downloads = 3
        for x in range(active_downloads):
            downloader = Downloader(download_queue, original_size)
            downloader.start()
        download_queue.join()

撰写回答