我对multiprocessing
模块很陌生。我刚刚尝试创建了以下内容:我有一个进程的任务是从RabbitMQ获取消息并将其传递到内部队列(multiprocessing.Queue
)。然后我要做的是:当新消息传入时生成一个进程。它可以工作,但在作业完成后,会留下一个僵尸进程,而不是由其父进程终止。这是我的代码:
主要流程:
#!/usr/bin/env python
import multiprocessing
import logging
import consumer
import producer
import worker
import time
import base
conf = base.get_settings()
logger = base.logger(identity='launcher')
request_order_q = multiprocessing.Queue()
result_order_q = multiprocessing.Queue()
request_status_q = multiprocessing.Queue()
result_status_q = multiprocessing.Queue()
CONSUMER_KEYS = [{'queue':'product.order',
'routing_key':'product.order',
'internal_q':request_order_q}]
# {'queue':'product.status',
# 'routing_key':'product.status',
# 'internal_q':request_status_q}]
def main():
# Launch consumers
for key in CONSUMER_KEYS:
cons = consumer.RabbitConsumer(rabbit_q=key['queue'],
routing_key=key['routing_key'],
internal_q=key['internal_q'])
cons.start()
# Check reques_order_q if not empty spaw a process and process message
while True:
time.sleep(0.5)
if not request_order_q.empty():
handler = worker.Worker(request_order_q.get())
logger.info('Launching Worker')
handler.start()
if __name__ == "__main__":
main()
这是我的工人:
import multiprocessing
import sys
import time
import base
conf = base.get_settings()
logger = base.logger(identity='worker')
class Worker(multiprocessing.Process):
def __init__(self, msg):
super(Worker, self).__init__()
self.msg = msg
self.daemon = True
def run(self):
logger.info('%s' % self.msg)
time.sleep(10)
sys.exit(1)
因此,在处理完所有消息之后,我可以看到使用ps aux
命令的进程。但我真的希望他们一结束就被终止。
谢谢。
有几件事:
确保父对象
joins
其子对象,以避免僵尸。见Python Multiprocessing Kill Processes您可以使用
is_alive()
成员函数检查子函数是否仍在运行。见http://docs.python.org/2/library/multiprocessing.html#multiprocessing.Process使用
multiprocessing.active_children
比使用Process.join
要好。函数active_children
清除自上次调用active_children
以来创建的所有僵尸。方法join
等待所选进程。在此期间,其他进程可以终止并成为僵尸,但父进程不会注意到,直到等待的方法被加入。要查看此操作:上面将创建3个进程,每个进程间隔10秒终止。因为代码是,最后一个进程是先连接的,所以之前终止的另外两个进程将是20秒的僵尸。你可以看到他们:
如果进程按其终止的顺序等待,则不会有僵尸。删除
reversed
以查看此情况。然而,在实际的应用程序中,我们很少知道子级将终止的序列,因此使用join
将导致一些僵尸。替代的
active_children
不会留下任何僵尸。 在上面的示例中,将循环for p in reversed(c):
替换为:看看会发生什么。
使用活跃的孩子。 multiprocessing.active_children
相关问题 更多 >
编程相关推荐