在Python中为迭代任务创建n个进程

1 投票
1 回答
1464 浏览
提问于 2025-04-21 08:19

我在使用Python的多进程模块时遇到了一个复杂的问题。我写了一个脚本,需要在一个特定的列表中的每个元素上调用一个多参数的函数(叫做call_function)。我的想法是定义一个整数'N',然后把这个问题分配给多个子进程来处理。

li=[a,b,c,d,e] #elements are int's
for element in li:
    call_function(element,string1,string2,int1)

call_summary_function()

总结函数会分析循环中所有迭代得到的结果。现在,我希望每次迭代都由一个单独的子进程来完成,但总的子进程数量不能超过N。如果超过了,主进程就要等到其中一个子进程结束后,再进行下一次迭代。而且,在所有子进程完成后,还需要调用call_summary_function。

我尝试了很多方法,使用多进程模块、锁和全局变量来保持当前运行的子进程数量(以便与N进行比较),但每次都会出错。

//--------------编辑-------------//

首先,这是主进程的代码:

MAX_PROCESSES=3
lock=multiprocessing.Lock()
processes=0
k=0
while k < len(k_list):

    if processes<=MAX_PROCESSES: # running processes <= 'N' set by me

        p = multiprocessing.Process(target=single_analysis, args=(k_list[k],main_folder,training_testing,subsets,positive_name,ratio_list,lock,processes))
        p.start()
        k+=1

    else: time.sleep(1)


while processes>0: time.sleep(1)

现在:这是被多进程调用的函数:

def single_analysis(k,main_folder,training_testing,subsets,positive_name,ratio_list,lock,processes):

lock.acquire()
processes+=1
lock.release()

#stuff to do

lock.acquire()
processes-=1
lock.release()

我遇到的错误是,整数值(进程变量)总是等于0,因为single_analysis()函数似乎创建了一个新的本地变量processes。当我把processes改为全局变量,并在single_analysis()中用global关键字导入它,并在函数内打印print processes in时,我得到的结果是len(li)次1...

1 个回答

1

你所描述的情况非常适合使用 multiprocessing.Pool,特别是它的 map 方法:

import multiprocessing
from functools import partial

def call_function(string1, string2, int1, element):
    # Do stuff here

if __name__ == "__main__":
    li=[a,b,c,d,e]
    p = multiprocessing.Pool(N)  # The pool will contain N worker processes.

    # Use partial so that we can pass a method that takes more than one argument to map.
    func = partial(call_function, string1,string2,int1)

    results = p.map(func, li)
    call_summary_function(results)

p.map 会对 li 列表中的每一个 element 调用 call_function(string1, string2, int1, element)results 将会是一个列表,里面包含了每次调用 call_function 返回的值。你可以把这个列表传给 call_summary_function 来处理结果。

撰写回答