获取pool.apply_async()结果时出现TypeError

1 投票
1 回答
1891 浏览
提问于 2025-04-18 17:19

问题

我遇到了一个异常,但不知道是什么原因引起的,希望能得到帮助解决这个问题。

背景

我在使用 Python 2.7.6 创建了一组工作进程,用来异步执行多个函数。在关闭并合并工作进程后,我检查 ApplyResult 对象,以确保所有函数都成功执行。当我尝试获取结果时,出现了以下错误:

追踪(最近的调用在最前面):
  文件 "parse.py",第 798 行,
    main()
  文件 "parse.py",第 769 行,
    produce_output_files(args.output_dir)
  文件 "parse.py",第 524 行,
    print(result.get())
  文件 "/user/Python-2.7.6/lib/python2.7/multiprocessing/pool.py",第 554 行,
    raise self._value
类型错误:foo1() 的参数必须是一个映射,而不是 AcquirerProxy

这是我用来启动子进程的代码:

def produce_output_files(output_dir):

    pool = multiprocessing.Pool()
    manager = multiprocessing.Manager()
    db_lock = manager.Lock()
    results = [pool.apply_async(func, output_dir, db_lock) 
               for func in [foo1, foo2, foo3]]

    pool.close()
    pool.join()

    for result in results:
        if not result.successful():
            print(result.get())

    return

我所有的目标函数都有以下结构:

def foo1(output_dir, db_lock):
    try:

        # wrapping the whole function in a try/except block because tracebacks
        # aren't pickleable, but they can be packaged into a string for pickling

    except:
        raise Exception("".join(traceback.format_exception(*sys.exc_info())))

调试步骤

这是工作进程的异常吗?

起初,我以为我只是从工作进程那里得到了一个追踪信息,因为 AsyncResult 的文档中提到:

如果远程调用引发了异常,那么这个异常将在 get() 中重新引发。

...而我将追踪信息打包成一个字符串的方式应该会在主进程中打印出正确的追踪信息。为了测试这一点,我将调用的函数改成了这样:

def _produce_C(output_dir, db_lock):
    raise Exception("test")

这个测试结果还是得到了相同的追踪信息,所以我知道我并不是在打印工作进程中的异常(“test”从未被打印)。我认为这个异常是因为我获取结果的方式,而不仅仅是子进程中异常的传播。

结果还没准备好?

我也知道在我调用结果对象的 get() 时,结果已经准备好了,因为我已经关闭并合并了工作进程。为了确认这一点,我将我的 for 循环改成了这样:

    for result in results:
        result.wait()
        if not result.successful():
            print(result.get())

结果还是得到了相同的追踪信息。

工作进程关闭了,结果过期了吗?

我最后一次尝试修复这个错误是将合并工作进程和获取结果的顺序调换成这样:

    for result in results:
        result.wait()
        if not result.successful():
            print(result.get())

    pool.close()
    pool.join()

结果依然是相同的追踪信息。

其他信息

正如在 这个 Python 问题报告 中所描述的,通常 get() 方法不会生成完整的追踪信息,因为追踪信息无法被序列化。然而,在我上面提到的第一次调试测试中,如果 get() 实际上捕获了工作进程的异常,我应该仍然能在追踪信息中看到字符串 "test"。此外,我用 try/except 块包裹函数以捕获追踪信息的方法在我链接的问题报告中也被特别提到作为一种解决方法。

1 个回答

2

你需要把调用 apply_async 函数时的参数放在一个元组里

results = [pool.apply_async(func, (output_dir, db_lock)) 
               for func in [foo1, foo2, foo3]]

这样做可以解决这个异常。想想 apply_async 的定义是这样的:

def apply_async(self, func, args=(), kwds={}, callback=None):

你现在传递参数的方式,其实是在做这个:

pool.apply_async(func, args=output_dir, kwargs=db_lock)

这就解释了错误追踪信息:

TypeError: foo1() argument after ** must be a mapping, not AcquirerProxy.

它试图把 db_lock 当作 kwargs 来处理。显然这不是你想要的!

撰写回答