PythonRay:如何在工作人员之间共享变量?

2024-04-19 22:00:49 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在开发一个管道,其中包括从机器学习模型中获取预测,我正在尝试使用ray来加速它。输入可能会重复,因此我希望在函数中共享一个缓存,以便此远程函数的所有工作人员都可以共享对缓存的访问,并对其进行搜索和获取值。 像下面这样的

@ray.remote
def f(x):
    # create inputs from x
    # do work
    unknown_y1 = []
    obtained_y1 = []
    for index, y in enumerate(y1):
        key = '|'.join([str(x) for x in y.values()])
        if key in cached:
            obtained_y1.append(cached[key])
        else:
            obtained_y1.append(np.inf)
            unknown_y1.append(promo)

    unknown_y2 = []
    obtained_y2 = []
    for index, y in enumerate(y2):
        key = '|'.join([str(x) for x in y.values()])
        if key in cached:
            obtained_y2.append(cached[key])
        else:
            obtained_y2.append(np.inf)
            unknown_y2.append(baseline)

    known_y1, known_y2 = predictor.predict(unknown_y1,unknown_y2)

    unknown_index = 0
    for index in range(len(y1)):
        if(obtained_y1[index] == np.inf):
            obtained_y1[index] = known_y1[unknown_index]
            key = '|'.join([str(x) for x in y1[index].values()])
            if not(key in cached):
                cached[key] = obtained_y1[index]
            unknown_index = unknown_index+1
    
    unknown_index = 0
    for index in range(len(y2)):
        if(obtained_y2[index] == np.inf):
            obtained_y2[index] = known_y2[unknown_index]
            key = '|'.join([str(x) for x in y2[index].values()])
            if not(key in cached):
                cached[key] = obtained_y2[index]
            unknown_index = unknown_index+1

我曾尝试通过在脚本顶部添加global cached;cached=dict()来创建一个全局字典,但该变量似乎与其他工作人员的版本不同,并且不共享数据。以前我是用dogpile.cache.redis来实现这一点的,但是该区域将不可序列化,因为它使用了线程锁。我也尝试过创建一个dict并使用ray.put(cached)将其放入ray的对象存储中,但我想我在某个地方读到了ray cannot share dictionaries in memory

我目前正在尝试从每个辅助进程返回缓存,并将它们合并到主缓存中,然后再次将它们放入对象存储中。 是否有更好的方法在ray workers之间共享字典/缓存


Tags: keyinforindexifunknownvaluesjoin
2条回答

不幸的是,您没有创建minimal, reproducible example,因此我看不出您是如何进行多重处理的。为了便于讨论,我将假设您正在使用来自multiprocessing模块(concurrent.futures.ProcessPoolExecutorPool类作为类似的工具)。然后您想使用一个管理的,sharabledict,如下所示:

from multiprocessing import Pool, Manager


def init_pool(the_cache):
    # initialize each process in the pool with the following global variable:
    global cached
    cached = the_cache

def main():
    with Manager() as manager:
        cached = manager.dict()
        with Pool(initializer=init_pool, initargs=(cached,)) as pool:
            ... # code that creates tasks

# required by Windows:
if __name__ == '__main__':
    main()

这将在dictionary中使用变量cached创建对该dictionary的代理的引用。因此,所有字典访问本质上更类似于远程过程调用,因此执行速度比“正常”字典访问慢得多。只是要知道

如果有其他机制来创建worker(decorator @ray.remote?),那么cached变量可以作为参数传递给函数f

您可能对这个关于为Ray编写函数缓存的问题/答案感兴趣Implementing cache for Ray actor function

您的想法是正确的,但我认为您缺少的关键细节是,您应该使用Ray将全局状态保存在actor或对象存储中(如果是不可变的)

在您的情况下,看起来您正在尝试缓存远程功能的一部分,而不是整个功能。你可能想要这样的东西

<>这是一个简化的版本,你可以考虑如何编写你的函数。

@ray.remote
class Cache:
  def __init__(self):
    self.cache = {}

  def put(self, x, y):
    self.cache[x] = y

  def get(self, x):
    return self.cache.get(x)

global_cache = Cache.remote()

@ray.remote
def f(x):
  all_inputs = list(range(x)) # A simplified set of generated inputs based on x
  obtained_output = ray.get([global_cache.get(i) for i in all_inputs])

  unknown_indices = []
  for i, output in enumerate(obtained_output):
    if output is None:
        unknown_inputs.append(i)
 
  # Now go through and calculate all the unknown inputs
  for i in unknown_inputs:
    output = predict(all_inputs[i]) # calculate the output
    global_cache.put.remote(output) # Cache it so it's available next time
    obtained_output[i] = output

  return obtained_output

相关问题 更多 >