Python中多进程保持队列顺序

1 投票
1 回答
49 浏览
提问于 2025-04-12 14:00

我正在尝试在多个处理器上运行loess回归,但当我的代码把每个并发的数据框添加到队列时,它们的顺序都乱了,导致生成的图看起来很糟糕。

def smooth_data_mp(data_frame):
    num_processes = 8
    chunk_size = 125000
    fraction = 125 / chunk_size
    print(data_frame.head())
    result_queue = Manager().Queue()
    with Pool(processes=num_processes) as pool:
        pool.starmap(process_data, [(data_frame, i, chunk_size, fraction, result_queue) for i in
                                    range(len(data_frame) // chunk_size)])

    # Collect results from the queue in order
    result_list = pd.DataFrame(result_queue.get())
    while not result_queue.empty():
        result_list = pd.concat([result_list, result_queue.get()])
    return result_list

def process_data(dataframe, i, chunk_size, fraction, result_queue):
    start_frame = chunk_size * i
    end_frame = min(chunk_size * (i + 1), len(dataframe))  # Ensure end_frame doesn't exceed length of sampleData
    print(f'{start_frame}, {end_frame}') # just for debugging
    new_data_frame = calculate_loess_on_subset(dataframe[start_frame:end_frame], chunk_size, fraction, i)
    result_queue.put(new_data_frame)

我该如何确保在process_data函数中添加到队列的每个数据框,都是按照原始数据集中出现的顺序添加的,而不是等到处理完成后再添加?

我尝试过使用不同类型的队列,比如普通队列和管理队列,但只有管理队列有效……不过我不太确定该如何解决这些问题。

1 个回答

3

问题在于,你在新的数据框(dataframe)一有新数据就立刻把它们放进一个队列里。但其实你不需要用队列,你可以直接使用starmap的返回值:

with Pool(processes=num_processes) as pool:
    results = pool.starmap(process_data, 
         [(data_frame, i, chunk_size, fraction) 
          for i in range(len(data_frame) // chunk_size)])
return pd.concat(results)

这样做可以保持输入数据框的原始顺序。接下来,process_data函数应该修改为:

def process_data(dataframe, i, chunk_size, fraction):
    ...similar code as original...
    return new_data_frame

(如果你出于某种原因确实想保留输出结果的队列,那么确保结果的顺序最简单的方法就是给处理后的数据框加上一个明确的行索引,最后再根据这个索引对合并后的数据框进行排序。在这里使用共享的输出队列,我觉得只有在你有其他线程或进程需要对这些结果进行进一步处理时才有意义,但在这种情况下似乎并不是这样。)
(还可以查看multiprocessing.managers.SyncManager中的示例“测试代码”)

撰写回答