在装饰器内使用多进程生成错误:无法序列化函数...未找到

6 投票
2 回答
6032 浏览
提问于 2025-04-17 06:18

我遇到了一个问题,无法解决,它与多进程有关,并且是在装饰器内部使用的。

当我使用多进程调用 run_in_parallels 方法时,出现了错误:

无法序列化 <function run_testcase at 0x00000000027789C8>: 它没有被找到为 __main__.run_testcase

这个调用发生在装饰器内部,然后就出现了上面提到的问题。当我在没有装饰器的情况下调用同样的 run_in_parallels 方法时,一切正常。

这个问题的原因是什么呢?


文件: w_PythonHelper.py

描述: 'run_in_parallel' 函数用于同时运行多个进程。第一个结束操作的方法会停止其他进程。

from multiprocessing import Process,Event

class ExtProcess(Process):
    def __init__(self, event,*args,**kwargs):
        self.event=event
        Process.__init__(self,*args,**kwargs)

    def run(self):
        Process.run(self)
        self.event.set()

class PythonHelper(object):
    @staticmethod
    def run_in_parallel(*functions):
        event=Event()
        processes=dict()
        for function in functions:
            fname=function[0]
            try:fargs=function[1]
            except:fargs=list()
            try:fproc=function[2]
            except:fproc=1
            for i in range(fproc):
                process=ExtProcess(event,target=fname,args=fargs)
                process.start()
                processes[process.pid]=process
        event.wait()
        for process in processes.values():
            process.terminate()
        for process in processes.values():
            process.join()

文件: w_Recorder.py

描述: 'capture' 函数用于抓取屏幕截图

from PIL import ImageGrab
import time

class Recorder(object):
    def capture(self):
        ImageGrab.grab().save("{f}.{e}".format(f=time.time(),e="png"))

文件: w_Decorators.py

描述: 并行运行给定函数以及 'Recorder' 类的 'capture' 方法

from w_Recorder import Recorder
from w_PythonHelper import PythonHelper

def check(function):
    def wrapper(*args):
        try:
            recorder=Recorder()
            PythonHelper.run_in_parallel([function,args],[recorder.capture])
            print("success")
        except Exception as e:
            print("failure: {}".format(e))
        return function
    return wrapper

文件: w_Logger.py

描述: 主程序(生成错误)

from w_Decorators import check
import time

class Logger(object):

    @check
    def run_testcase(self):
        # example function (runtime: 20s)
        for i in range(20):
            print("number: {}".format(i))
            time.sleep(1)

    def run_logger(self):
        self.run_testcase()


if __name__=="__main__":
    logger=Logger()
    logger.run_logger()

文件: w_Logger.py

描述: 主程序(正常工作)

from w_PythonHelper import PythonHelper
from w_Recorder import Recorder
import time

class Logger(object):

    def run_testcase(self):
        # example function (runtime: 20s)
        for i in range(20):
            print("number: {}".format(i))
            time.sleep(1)

    def run_logger(self):
        recorder=Recorder()
        PythonHelper.run_in_parallel([self.run_testcase],[recorder.capture])

if __name__=="__main__":
    logger=Logger()
    logger.run_logger()

这两个情况下相同的方法为什么会有不同的表现呢?


编辑: 有人知道如何解决这个问题吗(这是 Python 的 bug 吗)?如果没有,也许有人知道在应用程序运行时捕获屏幕截图的好方法?

实际上我发现了一个类似的问题: multiprocessing.Process 子类在 Linux 上工作但在 Windows 上不工作

答案是: 要解决这个问题,可以移除进程成员。,但我该如何在我的例子中做到这一点呢。

在调试时,错误发生在调用 process.start() 时,位于 run_in_parallel(*functions) 中。


编辑2: 就像 ivan_pozdeev 写的那样:我可以把包装器当作函数使用,但不能把它当作装饰器使用。我有很多函数是用这个装饰器装饰的,最简单的方法是在装饰器内部使用多进程。但不幸的是,我无法解决这个问题。也许有人已经解决了类似的问题。我会很感激任何提示。

'run_in_parallel' 函数按我想要的方式工作。两个或多个函数并行运行,第一个完成的函数会强制终止第二个函数。当我调用 wrapper(function,*args) 时,函数正常工作,但当我把这个机制放在装饰器内部时,就出现了 '无法序列化函数...没有被找到为' 的错误。详细信息可以在上面找到。

我的错误追踪:

Traceback (most recent call last):
  File "C:\Interpreters\Python32\lib\pickle.py", line 679, in save_global
    klass = getattr(mod, name)
AttributeError: 'module' object has no attribute 'run_testcase'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "C:\EskyTests\w_Logger.py", line 19, in <module>
    logger.run_logger()
  File "C:\EskyTests\w_Logger.py", line 14, in run_logger
    self.run_testcase()
  File "C:\EskyTests\w_Decorators.py", line 14, in wrapper
    PythonHelper.run_in_parallel([function,args],[recorder.capture])
  File "C:\EskyTests\w_PythonHelper.py", line 25, in run_in_parallel
    process.start()
  File "C:\Interpreters\Python32\lib\multiprocessing\process.py", line 130, in start
    self._popen = Popen(self)
  File "C:\Interpreters\Python32\lib\multiprocessing\forking.py", line 267, in __init__
    dump(process_obj, to_child, HIGHEST_PROTOCOL)
  File "C:\Interpreters\Python32\lib\multiprocessing\forking.py", line 190, in dump
    ForkingPickler(file, protocol).dump(obj)
  File "C:\Interpreters\Python32\lib\pickle.py", line 237, in dump
    self.save(obj)
  File "C:\Interpreters\Python32\lib\pickle.py", line 344, in save
    self.save_reduce(obj=obj, *rv)
  File "C:\Interpreters\Python32\lib\pickle.py", line 432, in save_reduce
    save(state)
  File "C:\Interpreters\Python32\lib\pickle.py", line 299, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Interpreters\Python32\lib\pickle.py", line 623, in save_dict
    self._batch_setitems(obj.items())
  File "C:\Interpreters\Python32\lib\pickle.py", line 656, in _batch_setitems
    save(v)
  File "C:\Interpreters\Python32\lib\pickle.py", line 299, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Interpreters\Python32\lib\pickle.py", line 683, in save_global
    (obj, module, name))
_pickle.PicklingError: Can't pickle <function run_testcase at 0x00000000027725C8>: it's not found as __main__.run_testcase

2 个回答

1

这有点复杂,但我觉得发生的事情是,check 在定义类的时候存储了一个未绑定方法的引用。当你调用 run_logger 时,工作示例使用的是一个绑定方法 self.run_testcase 的引用。

我认为最好的办法是把 run_testcase 变成一个顶层函数,而不是类的方法。

另外,你的 capture 函数可能不会按你预期的那样工作——当前时间会在函数定义时被存储,每次截图都会覆盖之前的那个。你可能想在函数内部调用 time.time()

4

你传给 Process.__init__() 的函数在Windows上不能被序列化。可以看看这个链接了解更多信息:16.6 multiprocessing - 编程指南 - Windows

关于你在使用顶层函数时遇到的错误,我怀疑你定义它的方式导致每次生成的对象都不一样,因此在子进程中并不是“同一个对象”。如果你真的需要这么复杂的功能,我建议你传一个简单的顶层函数,让它通过反射调用 run_testcase 更新:这个方法没有解决问题。

更新:

我通过去掉 run_testcaserun_in_parallelcapture 的装饰器让它们正常工作。@check 装饰器被替换成了 def wrapper(function,*args),功能是一样的:

import traceback
def wrapper(function,*args):
    try:
        recorder=Recorder()
        PythonHelper().run_in_parallel([function,args],[recorder.capture])
        print("success")
    except Exception,e:
        print("failure: "+traceback.format_exc(10))

主函数:

from w_Decorators import wrapper

if __name__=="__main__":
    logger=Logger()
    wrapper(logger.run_testcase)

果然,带装饰器的对象是不能被序列化的。

撰写回答