学习Python中的Queue模块(如何运行)

3 投票
2 回答
20485 浏览
提问于 2025-04-17 14:03

最近我接触到了队列的设计,主要是关于如何延迟处理和实现“先进先出”(FIFO)等功能。

我查看了相关文档,想要做一个简单的队列来理解如何在自己的设计或程序中实现它。但是我在运行这段代码时遇到了问题:

import queue

def worker():
    while True:
        item = q.get()
        do_work(item)
        q.task_done()

def main():

    q = queue.Queue(maxsize=0)
    for i in range(num_worker_threads):
         t = Thread(target=worker)
         t.daemon = True
         t.start()

    for item in source():
        q.put(item)

    q.join()       # block until all tasks are done

main()

我想请人解释一下这些for循环在做什么,我在运行代码时就出现了错误,所以我肯定是漏掉了什么。

出现的问题错误是: NameError: global name 'num_worker_threads' is not defined

感谢一位 -Python 新手- 的帮助。

2 个回答

3

你可以把工作线程的数量想象成银行里的柜员数量。就像人们(你的项目)在排队(你的队列)等着银行柜员(你的工作线程)来处理他们的事务。队列实际上是一种简单且易于理解的方式,用来管理线程中的复杂性。

我稍微调整了一下你的代码,来展示它是如何工作的。

import queue
import time
from threading import Thread

def do_work(item):
    print("processing", item)

def source():
    item = 1
    while True:
        print("starting", item)
        yield item
        time.sleep(0.2)
        item += 1

def worker():
    while True:
        item = q.get()
        do_work(item)
        q.task_done()

q = queue.Queue(maxsize=0)
def main():
    for i in range(2):
        t = Thread(target=worker)
        t.daemon = True
        t.start()

    for item in source():
        q.put(item)

    q.join()       # block until all tasks are done

main()
23

这个for循环会启动多个工作线程来执行名为“worker”的函数。下面是可以在你的系统上运行的Python 2.7的代码。

import Queue
import threading

# input queue to be processed by many threads
q_in = Queue.Queue(maxsize=0)

# output queue to be processed by one thread
q_out = Queue.Queue(maxsize=0)

# number of worker threads to complete the processing
num_worker_threads = 10

# process that each worker thread will execute until the Queue is empty
def worker():
    while True:
        # get item from queue, do work on it, let queue know processing is done for one item
        item = q_in.get()
        q_out.put(do_work(item))
        q_in.task_done()

# squares a number and returns the number and its square
def do_work(item):
    return (item,item*item)

# another queued thread we will use to print output
def printer():
    while True:
        # get an item processed by worker threads and print the result. Let queue know item has been processed
        item = q_out.get()
        print "%d squared is : %d" % item
        q_out.task_done()

# launch all of our queued processes
def main():
    # Launches a number of worker threads to perform operations using the queue of inputs
    for i in range(num_worker_threads):
         t = threading.Thread(target=worker)
         t.daemon = True
         t.start()

    # launches a single "printer" thread to output the result (makes things neater)
    t = threading.Thread(target=printer)
    t.daemon = True
    t.start()

    # put items on the input queue (numbers to be squared)
    for item in range(10):
        q_in.put(item)

    # wait for two queues to be emptied (and workers to close)   
    q_in.join()       # block until all tasks are done
    q_out.join()

    print "Processing Complete"

main()

这是根据@handle提供的Python 3版本的代码。

import queue 
import threading

# input queue to be processed by many threads
q_in = queue.Queue(maxsize=0) 

# output queue to be processed by one thread
q_out = queue.Queue(maxsize=0) 

# number of worker threads to complete the processing
num_worker_threads = 10

# process that each worker thread will execute until the Queue is empty
def worker():
    while True:
        # get item from queue, do work on it, let queue know processing is done for one item
        item = q_in.get()
        q_out.put(do_work(item))
        q_in.task_done()

# squares a number and returns the number and its square
def do_work(item):
    return (item,item*item)

# another queued thread we will use to print output
def printer():
    while True:
        # get an item processed by worker threads and print the result. Let queue know item has been processed
        item = q_out.get()
        print("{0[0]} squared is : {0[1]}".format(item) )
        q_out.task_done()

# launch all of our queued processes
def main():
    # Launches a number of worker threads to perform operations using the queue of inputs
    for i in range(num_worker_threads):
         t = threading.Thread(target=worker)
         t.daemon = True
         t.start()

    # launches a single "printer" thread to output the result (makes things neater)
    t = threading.Thread(target=printer)
    t.daemon = True
    t.start()

    # put items on the input queue (numbers to be squared)
    for item in range(10):
        q_in.put(item)

    # wait for two queues to be emptied (and workers to close)   
    q_in.join()       # block until all tasks are done
    q_out.join()

    print( "Processing Complete" )

main()

撰写回答