python 多进程 JoinableQueue PicklingError

0 投票
1 回答
1796 浏览
提问于 2025-04-18 10:45

抱歉……看起来我问了一个热门问题,但在StackOverflow上找不到对我情况有帮助的答案 :P

我的代码做了以下几件事:

第一步:父进程把任务对象写入到 multiprocessing.JoinableQueue。

第二步:子进程(不止一个)从 JoinableQueue 中读取(获取)任务对象并执行任务。

我的模块结构是:

A.py

  • 类 Task(object)

  • 类 WorkerPool(object)

  • 类 Worker(multiprocessing.Process)

    • def run() # 这里执行第二步

  • 类 TestGroup()

    • def loadTest() # 这里执行第一步,也就是添加 Task 的对象

我理解的是,当使用 mp.JoinableQueue 时,添加的对象应该是可以被“序列化”的,我从这个链接中明白了“可序列化”的意思。

我的问题是:

  1. 在我的情况下,Task 对象是可以被序列化的吗?

  2. 当代码把任务对象添加到 JoinableQueue 时,我遇到了下面的错误:

    文件 "/usr/lib/python2.6/multiprocessing/queues.py",第 242 行,在 _feed

2014-06-23 03:18:43 信息 TestGroup: G1 结束加载测试:object1

2014-06-23 03:18:43 信息 TestGroup: G1 结束加载测试:object2

2014-06-23 03:18:43 信息 TestGroup: G1 结束加载测试:object3

发送(obj)

PicklingError: 无法序列化:属性查找 pysphere.resources.VimService_services_types.DynamicData_Holder 失败

  1. mp.JoinableQueue 的一般用法是什么?在我的情况下,我需要使用 join() 和 task_done()。

  2. 当我选择使用 Queue.Queue 而不是 mp.JoinableQueue 时,序列化错误就消失了。然而,从日志中查看,我发现所有子进程一直在处理队列中的第一个对象,这种情况可能是什么原因呢?

1 个回答

2

Python中的multiprocessing模块可以让你同时运行多个任务。因为每个任务都是在不同的进程中运行,它们之间不能直接共享内存,所以需要通过一种叫做序列化的数据格式来进行沟通。这个模块使用了pickle模块来进行序列化,因此你传递给任务的对象必须是可以被pickle处理的。

1) 你的任务对象似乎包含了一个来自pysphere.resource.VimService_services_types的实例。这可能是指向某个系统资源的引用,比如一个打开的文件。这样的对象不能被序列化,也不能从一个进程传递到另一个进程,所以就会出现pickle错误。

你可以使用mp.JoinableQueue,将需要的参数传递给任务,并让任务在自己的进程中启动服务,这样服务就只在那个进程里了。

举个例子:

queue = mp.JoinableQueue()
# not queue.put(task), since the new process will create the task
queue.put(task_args)

def f(task_args):
    task = Task(task_args)
    ...
    # you can't return the task, unless you've closed all non-serializable parts
    return task.result

process = Process(target=f, args=(queue,))
... 

2) Queue.Queue是为线程设计的。它使用共享内存和同步机制来提供原子操作(也就是确保操作的完整性)。但是,当你用multiprocessing启动一个新进程时,它会复制初始进程,因此每个子进程都会在相同的队列对象上工作,因为内存中的队列已经被复制到每个进程里。

撰写回答