用多进程模块结束守护进程

4 投票
2 回答
9799 浏览
提问于 2025-04-16 10:35

下面我给大家举个使用多进程的例子。这是一个进程池的模型。虽然它看起来不那么简单,但结构上和我实际使用的代码比较接近。这个例子还用了sqlalchemy,抱歉。

我现在遇到的问题是,我有一个运行时间比较长的Python脚本,它执行了几个函数,每个函数的代码都像下面这样,所以在所有情况下,父进程都是一样的。换句话说,一个Python脚本创建了多个进程池。(我想我也可以不这样做,但另一种选择是使用像os.system和subprocess这样的方式。)问题是这些进程会一直存在,占用内存。文档上说这些守护进程应该会等到父进程退出后才结束,但如果父进程接着又生成了另一个进程池,且没有立即退出,那该怎么办呢?

调用terminate()可以结束这些进程,但这听起来不太礼貌。有没有什么好的方法可以礼貌地请求这些进程结束?也就是说,能不能让它们自己清理一下,然后离开,因为我需要启动下一个进程池?

我还尝试在进程上调用join()。根据文档,这意味着要等到这些进程结束。但如果它们不打算结束呢?实际上发生的情况是,进程会挂起。

提前谢谢你。

祝好,Faheem。

import multiprocessing, time

class Worker(multiprocessing.Process):
    """Process executing tasks from a given tasks queue"""
    def __init__(self, queue, num):
        multiprocessing.Process.__init__(self)
        self.num = num
        self.queue = queue
        self.daemon = True

    def run(self):
        import traceback
        while True:
            func, args, kargs = self.queue.get()
            try:
                print "trying %s with args %s"%(func.__name__, args)
                func(*args, **kargs)
            except:
                traceback.print_exc()
            self.queue.task_done()

class ProcessPool:
    """Pool of threads consuming tasks from a queue"""
    def __init__(self, num_threads):
        self.queue = multiprocessing.JoinableQueue()
        self.workerlist = []
        self.num = num_threads
        for i in range(num_threads):
            self.workerlist.append(Worker(self.queue, i))

    def add_task(self, func, *args, **kargs):
        """Add a task to the queue"""
        self.queue.put((func, args, kargs))

    def start(self):
        for w in self.workerlist:
            w.start()

    def wait_completion(self):
        """Wait for completion of all the tasks in the queue"""
        self.queue.join()
        for worker in self.workerlist:
            print worker.__dict__
            #worker.terminate()        <--- terminate used here  
            worker.join()              <--- join used here

start = time.time()

from sqlalchemy import *
from sqlalchemy.orm import *

dbuser = ''
password = ''
dbname = ''
dbstring = "postgres://%s:%s@localhost:5432/%s"%(dbuser, password, dbname)
db = create_engine(dbstring, echo=True)
m = MetaData(db)

def make_foo(i):
    t1 = Table('foo%s'%i, m, Column('a', Integer, primary_key=True))

conn = db.connect()
for i in range(10):
    conn.execute("DROP TABLE IF EXISTS foo%s"%i)
conn.close()

for i in range(10):
    make_foo(i)

m.create_all()

def do(i, dbstring):
    dbstring = "postgres://%s:%s@localhost:5432/%s"%(dbuser, password, dbname)
    db = create_engine(dbstring, echo=True)
    Session = scoped_session(sessionmaker())
    Session.configure(bind=db)
    Session.execute("ALTER TABLE foo%s SET ( autovacuum_enabled = false );"%i)
    Session.execute("ALTER TABLE foo%s SET ( autovacuum_enabled = true );"%i)
    Session.commit()

pool = ProcessPool(5)
for i in range(10):
    pool.add_task(do, i, dbstring)
pool.start()
pool.wait_completion()

2 个回答

3

我处理这个问题的方法是:

import multiprocessing

for prc in multiprocessing.active_children():
    prc.terminate()

我更喜欢这样做,因为这样就不用在工作函数里加一些if条件判断,避免让代码变得复杂。

2

你知道吗,multiprocessing 这个库已经有专门的类来管理工作线程池了,对吧?

一般来说,我们会给线程发送一个退出的信号:

queue.put(("QUIT", None, None))

然后再检查这个信号:

if func == "QUIT":
    return

撰写回答