Python多进程中的生产者/消费者问题

14 投票
3 回答
18118 浏览
提问于 2025-04-15 11:51

我正在写一个服务器程序,里面有一个生产者和多个消费者。让我困惑的是,只有第一个生产者放入队列的任务被消费,之后放入的任务就再也没有被消费,它们永远留在队列里。

from multiprocessing import Process, Queue, cpu_count
from http import httpserv
import time

def work(queue):
    while True:
        task = queue.get()
        if task is None:
            break
        time.sleep(5)
        print "task done:", task
    queue.put(None)

class Manager:
    def __init__(self):
        self.queue = Queue()
        self.NUMBER_OF_PROCESSES = cpu_count()

    def start(self):
        self.workers = [Process(target=work, args=(self.queue,))
                        for i in xrange(self.NUMBER_OF_PROCESSES)]
        for w in self.workers:
            w.start()

        httpserv(self.queue)

    def stop(self):
        self.queue.put(None)
        for i in range(self.NUMBER_OF_PROCESSES):
            self.workers[i].join()
        queue.close()

Manager().start()

这个生产者是一个HTTP服务器,它在收到用户请求时会把任务放入队列。奇怪的是,消费者进程在队列里有新任务时似乎还是被阻塞了。

附注:还有两个问题和上面无关。我不确定把HTTP服务器放在一个单独的进程中是否更好,如果是的话,怎么才能让主进程在所有子进程结束之前保持运行。第二个问题,优雅地停止HTTP服务器的最佳方法是什么?

编辑:添加了生产者的代码,它只是一个简单的Python WSGI服务器:

import fapws._evwsgi as evwsgi
from fapws import base

def httpserv(queue):
    evwsgi.start("0.0.0.0", 8080)
    evwsgi.set_base_module(base)

    def request_1(environ, start_response):
        start_response('200 OK', [('Content-Type','text/html')])
        queue.put('task_1')
        return ["request 1!"]

    def request_2(environ, start_response):
        start_response('200 OK', [('Content-Type','text/html')])
        queue.put('task_2')
        return ["request 2!!"]

    evwsgi.wsgi_cb(("/request_1", request_1))
    evwsgi.wsgi_cb(("/request_2", request_2))

    evwsgi.run()

3 个回答

4

“第二个问题,优雅地停止HTTP服务器的最佳方法是什么?”

这个问题有点复杂。

你有两种选择来进行进程间通信:

  • 第一种是“带外控制”。也就是说,服务器有其他的通信方式,比如另一个套接字、Unix信号,或者其他的方式。这个“其他的方式”可以是服务器本地目录下的一个“stop-now”文件。听起来有点奇怪,但实际上效果很好,而且比引入一个选择循环来监听多个套接字或信号处理器要简单。

    这个“stop-now”文件很容易实现。evwsgi.run()循环在每次请求后只需检查这个文件。如果你想让服务器停止,只需创建这个文件,执行一个/control请求(即使返回500错误也没关系),服务器就会停止。记得删除这个stop-now文件,否则你的服务器不会重新启动。

  • 第二种是“带内控制”。也就是说,服务器有另一个URL(/stop)可以用来停止它。乍一看,这似乎是个安全隐患,但实际上这完全取决于你打算如何使用这个服务器。因为它看起来只是一个内部请求队列的简单封装,所以这个额外的URL效果很好。

    要让这个方法有效,你需要编写自己的evwsgi.run()版本,通过设置某个变量来终止循环。

编辑

你可能不想直接终止服务器,因为你不知道它的工作线程的状态。你需要给服务器发信号,然后等它正常完成任务。

如果你想强制杀掉服务器,可以使用os.kill()(或者multiprocessing.terminate)。不过,这样做的前提是你不知道子线程在做什么。

12

我觉得可能是网络服务器那部分出了问题,因为这个运行得非常好:

from multiprocessing import Process, Queue, cpu_count
import random
import time


def serve(queue):
    works = ["task_1", "task_2"]
    while True:
        time.sleep(0.01)
        queue.put(random.choice(works))


def work(id, queue):
    while True:
        task = queue.get()
        if task is None:
            break
        time.sleep(0.05)
        print "%d task:" % id, task
    queue.put(None)


class Manager:
    def __init__(self):
        self.queue = Queue()
        self.NUMBER_OF_PROCESSES = cpu_count()

    def start(self):
        print "starting %d workers" % self.NUMBER_OF_PROCESSES
        self.workers = [Process(target=work, args=(i, self.queue,))
                        for i in xrange(self.NUMBER_OF_PROCESSES)]
        for w in self.workers:
            w.start()

        serve(self.queue)

    def stop(self):
        self.queue.put(None)
        for i in range(self.NUMBER_OF_PROCESSES):
            self.workers[i].join()
        self.queue.close()


Manager().start()

示例输出:

starting 2 workers
0 task: task_1
1 task: task_2
0 task: task_2
1 task: task_1
0 task: task_1

撰写回答