为什么在for循环中使用Python的多进程处理尴尬的并行问题时,没有速度提升,且共享了numpy数据?

16 投票
2 回答
2319 浏览
提问于 2025-04-16 08:09

我想加快一个与贝叶斯推断相关的并行问题。我的目标是根据一个矩阵A,推断出一组图像x的系数u,使得X = A*U。这里,X的维度是mxn,A是mxp,而U是pxn。对于X的每一列,我都需要推断出对应的系数U的最佳列。最后,这些信息会用来更新A。我使用的参数是m = 3000,p = 1500,n = 100。

因为这是一个线性模型,系数矩阵u的推断由n个独立的计算组成。所以,我尝试使用Python的多进程模块,但没有加快速度。

以下是我做的事情:

没有并行化的主要结构是:

import numpy as np
from convex import Crwlasso_cd

S = np.empty((m, batch_size))

for t in xrange(start_iter, niter):

    ## Begin Warm Start ##
    # Take 5 gradient steps w/ this batch using last coef. to warm start inf.
    for ws in range(5):
        # Initialize the coefficients
        if ws:
            theta = U
        else:
            theta = np.dot(A.T, X)

        # Infer the Coefficients for the given data batch X of size mxn (n=batch_size)
        # Crwlasso_cd is the function that does the inference per data sample
        # It's basically a C-inline code
        for k in range(batch_size):
            U[:,k] = Crwlasso_cd(X[:, k].copy(), A, theta=theta[:,k].copy())

        # Given the inferred coefficients, update and renormalize
        # the basis functions A 
        dA1 = np.dot(X - np.dot(A, U), U.T) # Gaussian data likelihood
        A += (eta / batch_size) * dA1
        A = np.dot(A, np.diag(1/np.sqrt((A**2).sum(axis=0))))

多进程的实现:

我尝试实现多进程。我有一台可以使用的8核机器。

  1. 有3个for循环。看起来唯一可以“并行化”的是第三个循环,在这个循环中推断系数:
    • 生成一个队列,将从0到batch_size-1的迭代数字放入队列中
    • 生成8个进程,让它们处理队列中的任务
  2. 使用multiprocessing.Array共享数据U

所以,我用以下代码替换了第三个循环:

from multiprocessing import Process, Queue
import multiprocessing as mp
from Queue import Empty

num_cpu = mp.cpu_count()
work_queue = Queue()

# Generate the empty ndarray U and a multiprocessing.Array-Wrapper U_mp around U
# The class Wrap_mp is attached. Basically, U_mp.asarray() gives the corresponding
# ndarray
U = np.empty((p, batch_size))
U_mp = Wrap_mp(U)

...

        # Within the for-loops:
        for p in xrange(batch_size):
        work_queue.put(p)

        processes = [Process(target=infer_coefficients_mp, args=(work_queue,U_mp,A,X)) for p in range(num_cpu)]

        for p in processes:
            p.start()
            print p.pid
        for p in processes:
            p.join()

这里是类Wrap_mp:

class Wrap_mp(object):
""" Wrapper around multiprocessing.Array to share an array across
    processes. Store the array as a multiprocessing.Array, but compute with it
as a numpy.ndarray
"""

    def __init__(self, arr):
        """ Initialize a shared array from a numpy array.

            The data is copied.
        """
        self.data = ndarray_to_shmem(arr)
        self.dtype = arr.dtype
        self.shape = arr.shape

    def __array__(self):
        """ Implement the array protocole.
        """
        arr = shmem_as_ndarray(self.data, dtype=self.dtype)
        arr.shape = self.shape
        return arr

    def asarray(self):
        return self.__array__()

这里是函数infer_coefficients_mp:

def infer_feature_coefficients_mp(work_queue,U_mp,A,X):

    while True:
        try:
            index = work_queue.get(block=False)
            x = X[:,index]
            U = U_mp.asarray()
            theta = np.dot(phit,x)

            # Infer the coefficients of the column index
            U[:,index] = Crwlasso_cd(x.copy(), A, theta=theta.copy())

         except Empty:
            break

现在的问题有以下几点:

  1. 对于给定的数据维度,多进程版本的速度并没有比单线程版本快。
  2. 每次迭代时,进程ID都在增加。这是否意味着每次都在生成新的进程?这样会不会产生很大的开销?我该如何避免?有没有可能在整个for循环中创建8个不同的进程,并仅用数据更新它们?
  3. 我在进程之间共享系数U的方式是否会减慢计算速度?有没有其他更好的方法?
  4. 使用进程池会不会更好?

我非常感谢任何帮助!我一个月前才开始学习Python,现在有点迷茫。

Engin

2 个回答

3

最终,这一切归结为一个问题:是否可以在主循环之外启动一些进程,然后在每次循环中把更新后的变量传给这些进程,让它们处理数据,并从所有进程中收集新计算出来的数据,而不需要在每次循环时都重新启动新的进程?

6

每次你创建一个进程的时候,其实就是在新建一个进程。如果你在一个循环里这样做,那每次循环都会启动新的进程。听起来你想要的做法是把队列和进程的初始化放在循环外面,然后在循环里面往队列里添加内容。

我之前用过multiprocessing.Pool,它确实挺有用的,但相比你已经用队列实现的功能,它并没有提供太多额外的东西。

撰写回答