Python多处理-无法连接当前线程

2024-05-09 17:26:42 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在分割大型ctype数组并并行处理它们。我接收到下面的错误,并相信它,因为数组的一个片段正在另一个片段之前完成处理。我尝试使用process.join()让第一组进程等待,但这不起作用。思想?

Exception RuntimeError: RuntimeError('cannot join current thread',) in <Finalize object, dead> ignored

使用:

    ....

        with closing(multiprocessing.Pool(initializer=init(array))) as p:
            del array #Since the array is now stored in a shared array destroy the array ref for memory reasons

            step = y // cores
            if step != 0:
                jobs =[]
                for i in range (0, y, step):
                    process = p.Process(target=stretch, args= (shared_arr,slice(i, i+step)),kwargs=options)
                    jobs.append(process)
                    process.start()

                for j in jobs:
                    j.join()

    del jobs
    del process

更新:

 #Create an ctypes array
        array = ArrayConvert.SharedMemArray(array)
        #Create a global of options
        init_options(options) #options is a dict
        with closing(multiprocessing.Pool(initializer=init(array))) as p:
            del array #Since the array is not stored in a shared array destroy the array ref for memory reasons


            step = y // cores
            if step != 0:
                for i in range (0, y, step):
                    #Package all the options into a global dictionary

                    p.map_async(stretch,[slice(i, i+step)])

                    #p.apply_async(stretch,args=(shared_arr,slice(i, i+step)),kwargs=options)

        p.join()        

def init_options(options_):
    global kwoptions
    kwoptions = options_

我传递给map_async的函数存储在另一个模块中,因此我正在努力获取传递给该函数的全局权值。在这样的模块之间传递全局参数似乎是不对的(不符合语法)。这是通过map_async传递kwargs的方法吗。

我应该使用不同的东西(应用程序或进程)来重新处理多进程吗?


Tags: theinforasync进程initisstep
2条回答

所以我通过重新编写代码和删除池(根据J.F.Sebastian的评论)来完成这项工作。

在伪代码中:

initialize the shared array
determine step size
create an empty list of jobs
create the process, pass it the kwargs, and append it to the job list
start the jobs
join the jobs

如果这对任何谷歌用户都有帮助的话,下面是代码:

#Initialize the ctypes array
        init(array)
        #Remove the reference to the array (to preserve memory on multiple iterations.
        del array

        step = y // cores
        jobs = []
        if step != 0:
            for i in range(0,y,step):        
                p = multiprocessing.Process(target=stretch,args= (shared_arr,slice(i, i+step)),kwargs=options)
                jobs.append(p)

            for job in jobs:
                job.start()
            for job in jobs:
                job.join()

Pool()的参数接受函数;用initializer=init, initargs=(array,)替换initializer=init(array)

要将关键字参数传递给与pool.*map*系列一起使用的函数f(),可以创建包装器mp_f()

#!/usr/bin/env python
import logging
import multiprocessing as mp
from contextlib import closing

def init(shared_array_):
    # globals that should be available in worker processes should be
    # initialized here
    global shared_array
    shared_array = shared_array_

def f(interval, a=None, b=None):
    mp.get_logger().info("interval=%r, a=%r, b=%r" % (interval, a, b))
    shared_array[interval] = [a + interval.start]*b # fake computations

def mp_f(arg_kwargs):
    try:
        arg, kwargs = arg_kwargs
        return f(arg, **kwargs) # pass keyword args to f()
    except Exception:
        mp.get_logger().error("f%r failed" % (arg_kwargs,))

def main():
    mp.log_to_stderr().setLevel(logging.INFO)

    N = 10**6
    array = mp.RawArray('i', N) # create shared array

    # create workers pool; use all available CPU cores
    with closing(mp.Pool(initializer=init, initargs=(array,))) as p:
        options = dict(a=5, b=N//4) # dummy options
        step = options['b']
        args = ((slice(i, i+step), options) for i in range(0, N, step))
        for _ in p.imap_unordered(mp_f, args): # submit jobs
            pass
    p.join()
    mp.get_logger().info(array[::step])

if __name__=="__main__":
    mp.freeze_support() # for py2exe and the-like on Windows
    main()

相关问题 更多 >