我正在分割大型ctype数组并并行处理它们。我接收到下面的错误,并相信它,因为数组的一个片段正在另一个片段之前完成处理。我尝试使用process.join()让第一组进程等待,但这不起作用。思想?
Exception RuntimeError: RuntimeError('cannot join current thread',) in <Finalize object, dead> ignored
使用:
....
with closing(multiprocessing.Pool(initializer=init(array))) as p:
del array #Since the array is now stored in a shared array destroy the array ref for memory reasons
step = y // cores
if step != 0:
jobs =[]
for i in range (0, y, step):
process = p.Process(target=stretch, args= (shared_arr,slice(i, i+step)),kwargs=options)
jobs.append(process)
process.start()
for j in jobs:
j.join()
del jobs
del process
更新:
#Create an ctypes array
array = ArrayConvert.SharedMemArray(array)
#Create a global of options
init_options(options) #options is a dict
with closing(multiprocessing.Pool(initializer=init(array))) as p:
del array #Since the array is not stored in a shared array destroy the array ref for memory reasons
step = y // cores
if step != 0:
for i in range (0, y, step):
#Package all the options into a global dictionary
p.map_async(stretch,[slice(i, i+step)])
#p.apply_async(stretch,args=(shared_arr,slice(i, i+step)),kwargs=options)
p.join()
def init_options(options_):
global kwoptions
kwoptions = options_
我传递给map_async的函数存储在另一个模块中,因此我正在努力获取传递给该函数的全局权值。在这样的模块之间传递全局参数似乎是不对的(不符合语法)。这是通过map_async传递kwargs的方法吗。
我应该使用不同的东西(应用程序或进程)来重新处理多进程吗?
所以我通过重新编写代码和删除池(根据J.F.Sebastian的评论)来完成这项工作。
在伪代码中:
如果这对任何谷歌用户都有帮助的话,下面是代码:
Pool()
的参数接受函数;用initializer=init, initargs=(array,)
替换initializer=init(array)
要将关键字参数传递给与
pool.*map*
系列一起使用的函数f()
,可以创建包装器mp_f()
:相关问题 更多 >
编程相关推荐