我有个不寻常的问题要问python。我使用multiprocessing
库来映射函数f((dynamic1, dynamic2), fix1, fix2)
。你知道吗
import multiprocessing as mp
fix1 = 4
fix2 = 6
# Numer of cores to use
N = 6
dynamic_duos = [(a, b) for a in range(5) for b in range(10)]
with mp.Pool(processes = N) as p:
p.starmap(f, [(dyn, fix1, fix2) for dyn in dynamic_duos])
我想动态控制活动进程的数量,因为该函数实际上有时会泵送大量RAM。其思想是在每次迭代时(即在函数f
的任何调用之前)检查sum(dyn)
是否低于阈值,以及RAM量是否高于阈值。如果条件匹配,则可以启动一个新进程并计算函数。你知道吗
另外一个条件是最大进程数:PC上的内核数
感谢您的帮助:)
编辑:原因详情。
某些参数组合将具有高RAM消耗(在一个进程上高达80 Gb)。我或多或少地知道哪些进程将使用大量的RAM,当程序遇到它们时,我希望等待另一个进程结束,在单个进程中开始这个高RAM消耗的组合,然后在组合的其余部分用更多的进程继续计算以映射。你知道吗
根据下面的答案编辑我的尝试:
它不起作用,但不会引起错误。它只是完成了程序。你知道吗
# Imports
import itertools
import concurrent.futures
# Parameters
N = int(input("Number of CPUs to use: "))
t0 = 0
tf = 200
s_step = 0.05
max_s = None
folder = "test"
possible_dynamics = [My_class(x) for x in [20, 30, 40, 50, 60]]
dynamics_to_compute = [list(x) for x in itertools.combinations_with_replacement(possible_dynamics , 2)] + [list(x) for x in itertools.combinations_with_replacement(possible_dynamics , 3)]
function_inputs = [(dyn , t0, tf, s_step, max_s, folder) for dyn in dynamics_to_compute]
# -----------
# Computation
# -----------
start = time.time()
# Pool creation and computation
futures = []
pool = concurrent.futures.ProcessPoolExecutor(max_workers = N)
for Obj, t0, tf, s_step, max_s, folder in function_inputs:
if large_memory(Obj, s_step, max_s):
concurrent.futures.wait(futures) # wait for all pending tasks
large_future = pool.submit(compute, Obj, t0, tf,
s_step, max_s, folder)
large_future.result() # wait for large computation to finish
else:
future = pool.submit(compute, Obj, t0, tf,
s_step, max_s, folder)
futures.append(future)
end = time.time()
if round(end-start, 3) < 60:
print ("Complete - Elapsed time: {} s".format(round(end-start,3)))
else:
print ("Complete - Elapsed time: {} mn and {} s".format(int((end-start)//60), round((end-start)%60,3)))
os.system("pause")
这仍然是我的代码的一个简化示例,但其思想就在这里。它的运行时间不到0.2秒,这意味着他实际上从未调用过函数compute
。你知道吗
注意:Obj
不是实际的变量名。你知道吗
为了实现这一点,您需要放弃使用
map
,以获得对任务执行流的更多控制。你知道吗这段代码实现了您在问题末尾描述的算法。我建议使用
concurrent.futures
库,因为它公开了一组更整洁的api。你知道吗相关问题 更多 >
编程相关推荐