如何在这种情况下正确使用多重处理?

2024-05-14 00:09:03 发布

您现在位置:Python中文网/ 问答频道 /正文

这是我第一次尝试并发编程,我需要一些帮助来解决这个问题。你知道吗

我正在尝试实现一个学习曲线函数,就像在sklearn.model\选择我自己的模块。我有两个循环,一个循环遍历要使用的样本数(在本例中是一个名为size\u I的分数[0,1]),另一个循环对每个n\u样本重复fit\u和\u score,重复j次。你知道吗

我们的目标是将通过模型的fit\u和\u score函数、n\u样本、将被评分的训练集和测试集以及可调用的评分器并行化。我实现的方法(见下面的代码)运行良好,但比我以前的串行代码慢很多倍。我假设这是由于序列化传递给fit\u和\u score函数的所有参数所产生的开销。你知道吗

由于这些参数在j迭代中不会改变,我认为必须能够传递它们一次,从而减少序列化开销。但是,我想不出一个办法。你知道吗

这是我的密码:

def lc_fit_and_score(size_i, model, X_train, y_train, X_test, y_test, scorer, strat):
    model_i = clone(model)
        if size_i == 1:
            #last iteration
            X_i, y_i = shuffle_in_unison_2d(X_train, y_train)
        else:
            X_i, _, y_i, _ = train_test_split(X_train, y_train, train_size=size_i, stratify=strat)
        model_i.fit(X_i, y_i.ravel())
        return scorer(model_i, X_i, y_i), scorer(model_i, X_test, y_test)

def learning_curve_mp(X_train, y_train, X_test, y_test, model, scorer, **kwargs):
    n_jobs = kwargs.pop('n_jobs', 1)
    n_jobs = len(os.sched_getaffinity(0)) if n_jobs==-1 else n_jobs
    n = kwargs.pop('n', 20)
    cv = kwargs.pop('cv', 10)
    y_window = kwargs.pop('y_window', None)
    test_idx = kwargs.pop('test_idx', []) 
    fname = kwargs.pop('fname', None)
    order = kwargs.pop('order', None)

    if kwargs:
        raise TypeError("Invalid parameters passed: {}".format(kwargs))

    strat = y_train if is_classifier(model) else np.array(y_train > 0, dtype=int)
    n_samples = []
    train_loss = np.zeros((n, cv))
    test_loss = np.zeros((n, cv))
    if n_jobs == 1:
        # serial version
        for i, size_i in enumerate(np.linspace(0, 1, n+1)[1:]):
            for j in range(cv):
                train, test = lc_fit_and_score(size_i, model, X_train, y_train, X_test, y_test, scorer, strat)
                train_loss[i, j] = train
                test_loss[i, j] =  test
            n_samples.append(int(X_train.shape[0]*size_i))
    else:
        # parallel version
        for i, size_i in enumerate(np.linspace(0, 1, n+1)[1:]):
            pool = Pool(n_jobs, initializer=init_pool, initargs=(,))
            result_objects = [pool.apply_async(lc_fit_and_score, args=(size_i, model, X_train, y_train, X_test, y_test, scorer, strat)) for x in range(cv)]
            pool.close()
            pool.join()
            train_loss[i] = [x.get()[0] for x in result_objects]
            test_loss[i] = [x.get()[1] for x in result_objects]
            n_samples.append(int(X_train.shape[0]*size_i))

理想情况下,迭代样本数的循环也可以并行化,但我也不确定如何做到这一点。感谢您的帮助!你知道吗


Tags: intestforsizemodeljobstrainpop
1条回答
网友
1楼 · 发布于 2024-05-14 00:09:03

所以我四处寻找减少开销的方法,找到了这个线程 How can global variables be accessed when using Multiprocessing and Pool? 这似乎是一个处理事情的黑客方式,如果有人知道更好,请随意分享。你知道吗

通过将代码从上面重新排列到下面:

def lc_fit_and_score(size_i):
    X_train = global_obj['X_train']
    X_test = global_obj['X_test']
    y_train = global_obj['y_train']
    y_test = global_obj['y_test']
    model_i = clone(global_obj['model'])
    scorer = global_obj['scorer']
    strat = global_obj['strat']
    cv = global_obj['cv']
    train = np.zeros(cv)
    test = np.zeros(cv)
    for j in range(cv):
        if size_i == 1:
            #last iteration
            X_i, y_i = shuffle_in_unison_2d(X_train, y_train)
        else:
            X_i, _, y_i, _ = train_test_split(X_train, y_train, train_size=size_i, stratify=strat)
        model_i.fit(X_i, y_i.ravel())
        train[j] = scorer(model_i, X_i, y_i)
        test[j] = scorer(model_i, X_test, y_test)
    return train, test

def make_global(obj):
    global global_obj
    global_obj = obj

tmp = {'model': model,
       'X_train': X_train,
       'y_train': y_train,
       'X_test': X_test,
       'y_test': y_test,
       'scorer': scorer,
       'strat': strat,
       'cv': cv
      }

if n_jobs == 1:
   # serial version
   make_global(tmp)
   for i, size_i in enumerate(np.linspace(0, 1, n+1)[1:]):
       train, test = lc_fit_and_score(size_i)
       train_loss[i, :] = train
       test_loss[i, :] = test
       n_samples.append(int(X_train.shape[0]*size_i))
else:
   # parallel version - Note: huge overhead, only worth for CPU intensive model-fits
   pool = Pool(n_jobs, initializer=make_global, initargs=(tmp,))
   result_objects = pool.map(lc_fit_and_score, [size_i for size_i in np.linspace(0, 1, n+1)[1:]])
   pool.close()
   pool.join()
   for i, result in enumerate(result_objects):
       train_loss[i, :] = result[0]
       test_loss[i, :] = result[1]
   n_samples = [int(X_train.shape[0]*size_i) for size_i in np.linspace(0, 1, n+1)[1:]]
del tmp

当使用CV=100的线性回归模型进行测试时,我获得了与串行版本相当的性能,而使用CV=1000的线性回归模型进行测试时,并行版本的速度快了2.4倍。我希望串行版本和并行版本之间的差异有利于在更复杂的模型中使用低CVs的并行版本。所以一旦模型拟合成为CPU的限制因素。你知道吗

相关问题 更多 >