Python多进程中的生产者/消费者问题
我正在写一个服务器程序,里面有一个生产者和多个消费者。让我困惑的是,只有第一个生产者放入队列的任务被消费,之后放入的任务就再也没有被消费,它们永远留在队列里。
from multiprocessing import Process, Queue, cpu_count
from http import httpserv
import time
def work(queue):
while True:
task = queue.get()
if task is None:
break
time.sleep(5)
print "task done:", task
queue.put(None)
class Manager:
def __init__(self):
self.queue = Queue()
self.NUMBER_OF_PROCESSES = cpu_count()
def start(self):
self.workers = [Process(target=work, args=(self.queue,))
for i in xrange(self.NUMBER_OF_PROCESSES)]
for w in self.workers:
w.start()
httpserv(self.queue)
def stop(self):
self.queue.put(None)
for i in range(self.NUMBER_OF_PROCESSES):
self.workers[i].join()
queue.close()
Manager().start()
这个生产者是一个HTTP服务器,它在收到用户请求时会把任务放入队列。奇怪的是,消费者进程在队列里有新任务时似乎还是被阻塞了。
附注:还有两个问题和上面无关。我不确定把HTTP服务器放在一个单独的进程中是否更好,如果是的话,怎么才能让主进程在所有子进程结束之前保持运行。第二个问题,优雅地停止HTTP服务器的最佳方法是什么?
编辑:添加了生产者的代码,它只是一个简单的Python WSGI服务器:
import fapws._evwsgi as evwsgi
from fapws import base
def httpserv(queue):
evwsgi.start("0.0.0.0", 8080)
evwsgi.set_base_module(base)
def request_1(environ, start_response):
start_response('200 OK', [('Content-Type','text/html')])
queue.put('task_1')
return ["request 1!"]
def request_2(environ, start_response):
start_response('200 OK', [('Content-Type','text/html')])
queue.put('task_2')
return ["request 2!!"]
evwsgi.wsgi_cb(("/request_1", request_1))
evwsgi.wsgi_cb(("/request_2", request_2))
evwsgi.run()
3 个回答
“第二个问题,优雅地停止HTTP服务器的最佳方法是什么?”
这个问题有点复杂。
你有两种选择来进行进程间通信:
第一种是“带外控制”。也就是说,服务器有其他的通信方式,比如另一个套接字、Unix信号,或者其他的方式。这个“其他的方式”可以是服务器本地目录下的一个“stop-now”文件。听起来有点奇怪,但实际上效果很好,而且比引入一个选择循环来监听多个套接字或信号处理器要简单。
这个“stop-now”文件很容易实现。
evwsgi.run()
循环在每次请求后只需检查这个文件。如果你想让服务器停止,只需创建这个文件,执行一个/control
请求(即使返回500错误也没关系),服务器就会停止。记得删除这个stop-now文件,否则你的服务器不会重新启动。第二种是“带内控制”。也就是说,服务器有另一个URL(
/stop
)可以用来停止它。乍一看,这似乎是个安全隐患,但实际上这完全取决于你打算如何使用这个服务器。因为它看起来只是一个内部请求队列的简单封装,所以这个额外的URL效果很好。要让这个方法有效,你需要编写自己的
evwsgi.run()
版本,通过设置某个变量来终止循环。
编辑
你可能不想直接终止服务器,因为你不知道它的工作线程的状态。你需要给服务器发信号,然后等它正常完成任务。
如果你想强制杀掉服务器,可以使用os.kill()
(或者multiprocessing.terminate
)。不过,这样做的前提是你不知道子线程在做什么。
我觉得可能是网络服务器那部分出了问题,因为这个运行得非常好:
from multiprocessing import Process, Queue, cpu_count
import random
import time
def serve(queue):
works = ["task_1", "task_2"]
while True:
time.sleep(0.01)
queue.put(random.choice(works))
def work(id, queue):
while True:
task = queue.get()
if task is None:
break
time.sleep(0.05)
print "%d task:" % id, task
queue.put(None)
class Manager:
def __init__(self):
self.queue = Queue()
self.NUMBER_OF_PROCESSES = cpu_count()
def start(self):
print "starting %d workers" % self.NUMBER_OF_PROCESSES
self.workers = [Process(target=work, args=(i, self.queue,))
for i in xrange(self.NUMBER_OF_PROCESSES)]
for w in self.workers:
w.start()
serve(self.queue)
def stop(self):
self.queue.put(None)
for i in range(self.NUMBER_OF_PROCESSES):
self.workers[i].join()
self.queue.close()
Manager().start()
示例输出:
starting 2 workers
0 task: task_1
1 task: task_2
0 task: task_2
1 task: task_1
0 task: task_1