我想使用ray对列表中的每个元素并行执行函数操作。下面是一个简化的代码片段
import numpy as np
import time
import ray
import psutil
num_cpus = psutil.cpu_count(logical=False)
ray.init(num_cpus=num_cpus)
@ray.remote
def f(a, b, c):
return a * b - c
def g(a, b, c):
return a * b - c
def my_func_par(large_list):
# arguments a and b are constant just to illustrate
# argument c is is each element of a list large_list
[f.remote(1.5, 2, i) for i in large_list]
def my_func_seq(large_list):
# arguments a anf b are constant just to illustrate
# argument c is is each element of a list large_list
[g(1.5, 2, i) for i in large_list]
my_list = np.arange(1, 10000)
s = time.time()
my_func_par(my_list)
print(time.time() - s)
>>> 2.007
s = time.time()
my_func_seq(my_list)
print(time.time() - s)
>>> 0.0372
问题是,当我计时my_func_par
时,它比my_func_seq
慢得多(~54x,如上所示)。ray的一位作者确实回答了关于this blog的评论,该评论似乎解释了我正在做的是设置不同的任务,这是不正确的
如何使用ray并修改上面的代码以并行运行它?(可能通过将large_list
分割成块,块的数量等于CPU的数量)
编辑:这个问题有两个重要标准
f
需要接受多个参数ray.put(large_list)
,以便larg_list
变量可以存储在共享内存中,而不是复制到每个处理器
要补充上述内容:
RayDistributed multiprocessing.Pool支持固定大小的Ray Actor池,以便于并行化
使用上面的代码,
my_func_par
运行得更快(大约0.1秒)。如果您使用代码并通过类似time.sleep
的方式使f(x)
变慢,您可以看到多处理的明显优势并行化版本速度较慢的原因是运行ray任务不可避免地会有运行开销(尽管它会花费大量精力对其进行优化)。这是因为并行运行需要进程间通信、序列化等
这就是说,如果你的函数真的很快(与运行函数的速度一样快,比分布式计算中的其他开销花费更少的时间,在分布式计算中,你的代码是完全正确的,因为函数f真的很小。我想运行该函数只需要不到一微秒的时间)
这意味着您应该使f函数的计算量更大,以便从并行化中获益。您提出的解决方案可能不起作用,因为即使在这之后,函数f可能仍然足够轻量级,这取决于您的列表大小
相关问题 更多 >
编程相关推荐