python 多进程 JoinableQueue PicklingError
抱歉……看起来我问了一个热门问题,但在StackOverflow上找不到对我情况有帮助的答案 :P
我的代码做了以下几件事:
第一步:父进程把任务对象写入到 multiprocessing.JoinableQueue。
第二步:子进程(不止一个)从 JoinableQueue 中读取(获取)任务对象并执行任务。
我的模块结构是:
A.py
类 Task(object)
类 WorkerPool(object)
类 Worker(multiprocessing.Process)
def run() # 这里执行第二步
类 TestGroup()
def loadTest() # 这里执行第一步,也就是添加 Task 的对象
我理解的是,当使用 mp.JoinableQueue 时,添加的对象应该是可以被“序列化”的,我从这个链接中明白了“可序列化”的意思。
我的问题是:
在我的情况下,Task 对象是可以被序列化的吗?
当代码把任务对象添加到 JoinableQueue 时,我遇到了下面的错误:
文件 "/usr/lib/python2.6/multiprocessing/queues.py",第 242 行,在 _feed
2014-06-23 03:18:43 信息 TestGroup: G1 结束加载测试:object1
2014-06-23 03:18:43 信息 TestGroup: G1 结束加载测试:object2
2014-06-23 03:18:43 信息 TestGroup: G1 结束加载测试:object3
发送(obj)
PicklingError: 无法序列化:属性查找 pysphere.resources.VimService_services_types.DynamicData_Holder 失败
mp.JoinableQueue 的一般用法是什么?在我的情况下,我需要使用 join() 和 task_done()。
当我选择使用 Queue.Queue 而不是 mp.JoinableQueue 时,序列化错误就消失了。然而,从日志中查看,我发现所有子进程一直在处理队列中的第一个对象,这种情况可能是什么原因呢?
1 个回答
Python中的multiprocessing
模块可以让你同时运行多个任务。因为每个任务都是在不同的进程中运行,它们之间不能直接共享内存,所以需要通过一种叫做序列化的数据格式来进行沟通。这个模块使用了pickle模块来进行序列化,因此你传递给任务的对象必须是可以被pickle处理的。
1) 你的任务对象似乎包含了一个来自pysphere.resource.VimService_services_types的实例。这可能是指向某个系统资源的引用,比如一个打开的文件。这样的对象不能被序列化,也不能从一个进程传递到另一个进程,所以就会出现pickle错误。
你可以使用mp.JoinableQueue,将需要的参数传递给任务,并让任务在自己的进程中启动服务,这样服务就只在那个进程里了。
举个例子:
queue = mp.JoinableQueue()
# not queue.put(task), since the new process will create the task
queue.put(task_args)
def f(task_args):
task = Task(task_args)
...
# you can't return the task, unless you've closed all non-serializable parts
return task.result
process = Process(target=f, args=(queue,))
...
2) Queue.Queue是为线程设计的。它使用共享内存和同步机制来提供原子操作(也就是确保操作的完整性)。但是,当你用multiprocessing启动一个新进程时,它会复制初始进程,因此每个子进程都会在相同的队列对象上工作,因为内存中的队列已经被复制到每个进程里。