Python根据可用RAM和函数参数的数量动态控制多处理脚本中的进程数

2024-04-19 23:42:42 发布

您现在位置:Python中文网/ 问答频道 /正文

我有个不寻常的问题要问python。我使用multiprocessing库来映射函数f((dynamic1, dynamic2), fix1, fix2)。你知道吗

import multiprocessing as mp

fix1 = 4
fix2 = 6

# Numer of cores to use
N = 6

dynamic_duos = [(a, b) for a in range(5) for b in range(10)]

with mp.Pool(processes = N) as p:
    p.starmap(f, [(dyn, fix1, fix2) for dyn in dynamic_duos])

我想动态控制活动进程的数量,因为该函数实际上有时会泵送大量RAM。其思想是在每次迭代时(即在函数f的任何调用之前)检查sum(dyn)是否低于阈值,以及RAM量是否高于阈值。如果条件匹配,则可以启动一个新进程并计算函数。你知道吗

另外一个条件是最大进程数:PC上的内核数

感谢您的帮助:)

编辑:原因详情。

某些参数组合将具有高RAM消耗(在一个进程上高达80 Gb)。我或多或少地知道哪些进程将使用大量的RAM,当程序遇到它们时,我希望等待另一个进程结束,在单个进程中开始这个高RAM消耗的组合,然后在组合的其余部分用更多的进程继续计算以映射。你知道吗

根据下面的答案编辑我的尝试:

它不起作用,但不会引起错误。它只是完成了程序。你知道吗

# Imports
import itertools
import concurrent.futures

# Parameters
N = int(input("Number of CPUs to use: "))
t0 = 0
tf = 200
s_step = 0.05
max_s = None
folder = "test"

possible_dynamics = [My_class(x) for x in [20, 30, 40, 50, 60]]
dynamics_to_compute = [list(x) for x in itertools.combinations_with_replacement(possible_dynamics , 2)] + [list(x) for x in itertools.combinations_with_replacement(possible_dynamics , 3)]

function_inputs = [(dyn , t0, tf, s_step, max_s, folder) for dyn in dynamics_to_compute]

# -----------
# Computation
# -----------
start = time.time()

# Pool creation and computation
futures = []
pool = concurrent.futures.ProcessPoolExecutor(max_workers = N)

for Obj, t0, tf, s_step, max_s, folder in function_inputs:
    if large_memory(Obj, s_step, max_s):
        concurrent.futures.wait(futures)  # wait for all pending tasks
        large_future = pool.submit(compute, Obj, t0, tf, 
                             s_step, max_s, folder)
        large_future.result()  # wait for large computation to finish
    else:
        future = pool.submit(compute, Obj, t0, tf, 
                             s_step, max_s, folder)
        futures.append(future)

end = time.time()
if round(end-start, 3) < 60:
    print ("Complete - Elapsed time: {} s".format(round(end-start,3)))
else:
    print ("Complete - Elapsed time: {} mn and {} s".format(int((end-start)//60), round((end-start)%60,3)))

os.system("pause")

这仍然是我的代码的一个简化示例,但其思想就在这里。它的运行时间不到0.2秒,这意味着他实际上从未调用过函数compute。你知道吗

注意:Obj不是实际的变量名。你知道吗


Tags: toinfortime进程tfstepfolder
1条回答
网友
1楼 · 发布于 2024-04-19 23:42:42

为了实现这一点,您需要放弃使用map,以获得对任务执行流的更多控制。你知道吗

这段代码实现了您在问题末尾描述的算法。我建议使用concurrent.futures库,因为它公开了一组更整洁的api。你知道吗

import concurrent.futures

pool = concurrent.futures.ProcessPoolExecutor(max_workers=6)

futures = []

for dyn, fix1, fix2 in dynamic_duos:
    if large_memory(dyn, fix1, fix2):
        concurrent.futures.wait(futures)  # wait for all pending tasks
        large_future = pool.submit(f, dyn, fix1, fix2)
        large_future.result()  # wait for large computation to finish
    else:
        future = pool.submit(f, dyn, fix1, fix2)
        futures.append(future)

相关问题 更多 >