Python 在不阻塞父进程的情况下加入子进程

23 投票
5 回答
17759 浏览
提问于 2025-04-16 13:06

我正在写一个程序,这个程序会监视一个特定的文件夹,查看里面是否有新的文件,这些文件里包含下载链接。一旦发现了新文件,程序就会创建一个新的进程来实际下载文件,而父进程则继续监视这个文件夹。我使用的是来自 multiprocessingProcess 接口。现在我遇到的问题是,如果我不调用 process.join(),子进程会一直运行,但 process.join() 是一个阻塞函数,这样就失去了创建子进程来处理下载的意义。

我的问题是,有没有办法以非阻塞的方式来等待子进程结束,这样父进程就可以继续做其他事情?

部分代码:

def main(argv):
  # parse command line args
  ...
  # set up variables
  ...
  watch_dir(watch_dir, download_dir)


def watch_dir(wDir, dDir):
  # Grab the current watch directory listing
  before = dict([(f, None) for f in os.listdir (wDir)])

  # Loop FOREVER
  while 1:
    # sleep for 10 secs
    time.sleep(10)

    # Grab the current dir listing
    after = dict([(f, None) for f in os.listdir (wDir)])

    # Get the list of new files
    added = [f for f in after if not f in before]
    # Get the list of deleted files
    removed = [f for f in before if not f in after]

    if added:
      # We have new files, do your stuff
      print "Added: ", ", ".join(added)

      # Call the new process for downloading
      p = Process(target=child, args=(added, wDir, dDir))
      p.start()
      p.join()

    if removed:
      # tell the user the file was deleted
      print "Removed: ", ", ".join(removed)

    # Set before to the current
    before = after

def child(filename, wDir, dDir):
  # Open filename and extract the url
  ...
  # Download the file and to the dDir directory
  ...
  # Delete filename from the watch directory
  ...
  # exit cleanly
  os._exit(0)

父进程在调用 p.join() 之前会等待子进程完成执行,然后才继续,这在我看来是正确的。但这就违背了创建子进程的初衷。如果我不调用 p.join(),那么子进程会保持活跃状态,使用 ps ax | grep python 会显示 'python <defunct>'。

我希望子进程能完成它的工作后退出,而不影响父进程的运行。有没有办法做到这一点呢?

5 个回答

3

与其费力地让 multiprocessing.Process() 为你工作,不如换个工具,比如用 apply_async() 和 multiprocessing.Pool()。

def main(argv):
    # parse command line args
    ...
    # set up variables
    ...

    # set up multiprocessing Pool
    pool = multiprocessing.Pool()

    try:
        watch_dir(watch_dir, download_dir, pool)

    # catch whatever kind of exception you expect to end your infinite loop
    # you can omit this try/except if you really think your script will 
    # run "forever" and you're okay with zombies should it crash
    except KeyboardInterrupt:
        pool.close()
        pool.join()

def watch_dir(wDir, dDir, pool):
    # Grab the current watch directory listing
    before = dict([(f, None) for f in os.listdir (wDir)])

    # Loop FOREVER
    while 1:
        # sleep for 10 secs
        time.sleep(10)

        # Grab the current dir listing
        after = dict([(f, None) for f in os.listdir (wDir)])

        # Get the list of new files
        added = [f for f in after if not f in before]
        # Get the list of deleted files
        removed = [f for f in before if not f in after]

        if added:
            # We have new files, do your stuff
            print "Added: ", ", ".join(added)

            # launch the function in a subprocess - this is NON-BLOCKING
            pool.apply_async(child, (added, wDir, dDir))

        if removed:
            # tell the user the file was deleted
            print "Removed: ", ", ".join(removed)

        # Set before to the current
        before = after

def child(filename, wDir, dDir):
    # Open filename and extract the url
    ...
    # Download the file and to the dDir directory
    ...
    # Delete filename from the watch directory
    ...
    # simply return to "exit cleanly"
    return

multiprocessing.Pool() 就像是一组可以同时工作的子进程,你可以把“任务”提交给它们。使用 pool.apply_async() 这个函数时,它会让其中一个子进程去运行你指定的函数,并且使用你提供的参数,这个过程是异步的,也就是说你不需要等这个子进程完成就可以继续执行脚本,直到所有工作完成后再关闭整个池子。这个库会帮你处理很多细节。

我觉得这个方法比目前的答案更好,原因有以下几点:
1. 它省去了启动额外线程和队列的复杂性,这些本来只是为了管理子进程。
2. 它使用的是专门为这个目的设计的库函数,所以你可以享受到未来库的改进带来的好处。
3. 在我看来,它更容易维护。
4. 它更灵活。如果有一天你想从子进程中获取返回值,你可以把 apply_async() 调用的返回值(一个 结果对象)存起来,随时检查。你还可以把这些结果存到一个列表里,当列表的大小超过某个值时再一起处理。如果你把池子的创建放到 watch_dir() 函数里,并且不在乎“无限”循环被打断时发生什么,就可以省去 try/except。如果你在这个(目前是)无限循环里加上某种退出条件,循环结束后只需加上 pool.close()pool.join(),这样一切就能清理干净。

7

在你的while循环中,调用

multiprocessing.active_children()

这个函数会返回当前进程所有还在运行的子进程的列表。调用这个函数的一个副作用是,它会“合并”那些已经结束的进程。

16

你可以设置一个单独的线程来处理连接(join)的工作。让这个线程监听一个队列,你可以把子进程的句柄放进去:

class Joiner(Thread):
    def __init__(self, q):
        self.__q = q
    def run(self):
        while True:
            child = self.__q.get()
            if child == None:
                return
            child.join()

然后,不要直接使用p.join(),而是用joinq.put(p)把子进程放入队列,并且用joinq.put(None)来告诉线程可以停止了。记得要使用先进先出的队列(FIFO)。

撰写回答