如何在Python ThreadPool中使用初始化器

5 投票
1 回答
2176 浏览
提问于 2025-04-17 00:20

我正在尝试使用PyFFTW进行多线程卷积,以便同时计算大量的二维卷积。因为在进行Numpy操作时,GIL(全局解释器锁)会被释放,所以不需要单独的进程。

这里有一个经典的做法:

http://code.activestate.com/recipes/577187-python-thread-pool/

(Py)FFTW之所以快,是因为它可以重用计划。为了避免访问冲突错误,这些计划需要为每个线程单独设置,像这样:

class Worker(Thread):
    """Thread executing tasks from a given tasks queue"""
    def __init__(self, tasks):
        Thread.__init__(self)
        self.tasks = tasks
        self.daemon = True

        # Make separate fftw plans for each thread.
        flag_for_fftw='patient'      
        self.inputa = np.zeros(someshape, dtype='float32')
        self.outputa = np.zeros(someshape_semi, dtype='complex64')

        # create a forward plan.
        self.fft = fftw3.Plan(self.inputa,self.outputa, direction='forward', flags=[flag_for_fftw],nthreads=1)         

        # Initialize the arrays for the inverse fft.
        self.inputb = np.zeros(someshape_semi, dtype='complex64')
        self.outputb = np.zeros(someshape, dtype='float32')

        # Create the backward plan.
        self.ifft = fftw3.Plan(self.inputb,self.outputb, direction='backward', flags=[flag_for_fftw],nthreads=1)               
        self.start() 

这样,你就可以在Worker类的run方法中将参数self.inputaself.outputaself.fftself.inputbself.outputbself.ifft传递给实际的卷积器。

这听起来不错,但我们也可以导入ThreadPool类:

from multiprocessing.pool import ThreadPool

那么我应该如何在ThreadPool中定义初始化器,以获得相同的结果呢?根据文档,http://docs.python.org/library/multiprocessing.html,"每个工作进程在启动时会调用initializer(*initargs)"。你可以在Python源代码中轻松检查这一点。

但是,当你设置Threadpool,比如说用2个线程:

po = ThreadPool(2,initializer=tobedetermined)

然后你运行它,可能是在某个循环中:

po.apply_async(convolver,(some_input,))

那么你如何让卷积器通过初始化器进行设置呢?如何让它在每个线程中使用不同的FFTW计划,而不需要为每个卷积重新计算FFTW计划呢?

祝好,
Alex。

1 个回答

1

你可以用一个函数来包裹这个卷积器的调用,这个函数会使用线程本地存储(threading.local())来初始化PyFFTW,并且记住这个结果。

撰写回答