multiprocessing.Pool:使用apply_async的回调选项时调用助手函数

4 投票
1 回答
4856 浏览
提问于 2025-04-18 15:35

在调用可迭代的函数和回调函数之间,apply_async的流程是怎样的呢?

背景:我正在读取一个有2000个文件的目录中的所有文件的几行内容,有些文件有几百万行,有些只有几行。我提取了一些头部信息、格式和日期数据来描述每个文件。因为我在一台有16个CPU的机器上工作,所以使用多进程处理是有意义的。

目前,我把期望的结果发送到一个列表(ahlala)中,以便我可以打印出来;之后,这些结果会写入*.csv文件。这是我代码的简化版本,最初是基于这篇非常有帮助的帖子

import multiprocessing as mp

def dirwalker(directory):
  ahlala = []

  # X() reads files and grabs lines, calls helper function to calculate
  # info, and returns stuff to the callback function
  def X(f): 
    fileinfo = Z(arr_of_lines) 
    return fileinfo 

  # Y() reads other types of files and does the same thing
  def Y(f): 
    fileinfo = Z(arr_of_lines)
    return fileinfo

  # results() is the callback function
  def results(r):
    ahlala.extend(r) # or .append, haven't yet decided

  # helper function
  def Z(arr):
    return fileinfo # to X() or Y()!

  for _,_,files in os.walk(directory):
    pool = mp.Pool(mp.cpu_count()
    for f in files:
      if (filetype(f) == filetypeX): 
        pool.apply_async(X, args=(f,), callback=results)
      elif (filetype(f) == filetypeY): 
        pool.apply_async(Y, args=(f,), callback=results)

  pool.close(); pool.join()
  return ahlala

注意,如果我把所有的Z()这个辅助函数放到X()Y()results()中,代码是可以正常工作的,但这样做是不是显得重复,或者可能会比其他方式慢呢?我知道回调函数会在每次函数调用后被调用,但它到底是什么时候被调用的呢?是在pool.apply_async()完成所有进程的工作之后吗?如果这些辅助函数在pool.apply_async()的第一个函数(在这个例子中是X())的范围内被调用,不应该更快吗?如果这样做不行,我是不是应该把辅助函数放在results()里呢?

其他相关想法:是不是因为守护进程的原因,什么都没有显示出来?我对如何排队处理也很困惑,这是否是问题所在。这似乎是一个学习的起点,但是在使用apply_async时,排队处理可以安全忽略吗,还是会导致明显的时间效率低下呢?

1 个回答

9

你问了很多不同的问题,我会尽量一一解答:

你传给 callback 的函数会在主进程中执行(不是在工作进程中),一旦工作进程返回结果,它就会被执行。这个函数是在 Pool 对象内部创建的一个线程中运行的。这个线程会从 result_queue 中获取结果,result_queue 是用来收集所有工作进程的结果的。当线程从队列中取出结果后,就会执行 callback。在你的回调函数执行期间,其他结果无法从队列中取出,所以回调函数执行得越快越好。举个例子,当你通过 apply_async 调用的 XY 中的一个完成时,工作进程会把结果放入 result_queue,然后结果处理线程会从 result_queue 中取出结果,接着执行你的 callback

其次,我猜测你没有看到任何结果的原因可能是因为你的工作函数调用都失败了。如果工作函数失败了,callback 就不会被执行。这个失败不会被报告,除非你尝试从调用 apply_async 返回的 AsyncResult 对象中获取结果。不过,由于你没有保存这些对象,你永远也不会知道发生了失败。如果我是你,我会在测试时使用 pool.apply,这样可以在出错时立即看到错误。

你提供的示例代码中,工作进程可能失败的原因是因为 XY 是在另一个函数内部定义的。multiprocessing 通过在主进程中将函数和对象进行“打包”,然后在工作进程中“解包”来传递它们。嵌套在其他函数内部的函数是无法被“打包”的,这意味着 multiprocessing 无法在工作进程中成功“解包”它们。要解决这个问题,你应该把这两个函数定义在模块的顶层,而不是嵌套在 dirwalker 函数内部。

你绝对应该继续在 XY 中调用 Z,而不是在 results 中。这样,Z 就可以在所有工作进程中并行运行,而不是在主进程中一次一个地运行。而且记住,你的 callback 函数应该尽可能快,这样就不会拖慢结果处理的速度。如果在这里执行 Z 会让事情变得更慢。

这里有一些简单的示例代码,和你正在做的事情类似,希望能给你一个代码应该是什么样子的概念:

import multiprocessing as mp
import os

# X() reads files and grabs lines, calls helper function to calculate
# info, and returns stuff to the callback function
def X(f): 
    fileinfo = Z(f) 
    return fileinfo 

# Y() reads other types of files and does the same thing
def Y(f): 
    fileinfo = Z(f)
    return fileinfo

# helper function
def Z(arr):
    return arr + "zzz"

def dirwalker(directory):
    ahlala = []

    # results() is the callback function
    def results(r):
        ahlala.append(r) # or .append, haven't yet decided

    for _,_,files in os.walk(directory):
        pool = mp.Pool(mp.cpu_count())
        for f in files:
            if len(f) > 5: # Just an arbitrary thing to split up the list with
                pool.apply_async(X, args=(f,), callback=results)  # ,error_callback=handle_error # In Python 3, there's an error_callback you can use to handle errors. It's not available in Python 2.7 though :(
            else:
                pool.apply_async(Y, args=(f,), callback=results)

    pool.close()
    pool.join()
    return ahlala


if __name__ == "__main__":
    print(dirwalker("/usr/bin"))

输出:

['ftpzzz', 'findhyphzzz', 'gcc-nm-4.8zzz', 'google-chromezzz' ... # lots more here ]

编辑:

你可以使用 multiprocessing.Manager 类创建一个在父进程和子进程之间共享的字典对象:

pool = mp.Pool(mp.cpu_count())
m = multiprocessing.Manager()
helper_dict = m.dict()
for f in files:
    if len(f) > 5:
        pool.apply_async(X, args=(f, helper_dict), callback=results)
    else:
        pool.apply_async(Y, args=(f, helper_dict), callback=results)

然后让 XY 接受一个名为 helper_dict 的第二个参数(或者你想要的任何名字),这样就可以了。

需要注意的是,这种方法是通过创建一个包含普通字典的服务器进程来实现的,所有其他进程通过代理对象与这个字典进行通信。所以每次你读或写这个字典时,都是在进行进程间通信(IPC)。这会比真正的字典慢很多。

撰写回答