使用分布式Clus的Python多处理

2024-05-29 07:37:40 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在寻找一个python包,它不仅可以跨一台计算机中的不同核心进行多处理,还可以跨多台计算机分布一个集群。有很多不同的python包用于分布式计算,但大多数似乎需要更改代码才能运行(例如,前缀指示对象在远程计算机上)。具体来说,我希望尽可能接近多处理pool.map函数。例如,如果在一台机器上,脚本是:

from multiprocessing import Pool
pool = Pool(processes = 8)
resultlist = pool.map(function, arglist)

那么,分布式集群的伪代码是:

from distprocess import Connect, Pool, Cluster

pool1 = Pool(processes = 8)
c = Connect(ipaddress)
pool2 = c.Pool(processes = 4)
cluster = Cluster([pool1, pool2])
resultlist = cluster.map(function, arglist)

Tags: 代码fromimportmap计算机connect集群function
3条回答

如果你想要一个非常简单的解决方案,没有一个。

但是,有一种解决方案具有multiprocessing接口pathos,它能够通过并行映射建立到远程服务器的连接,并执行多处理。

如果你想有一个ssh隧道连接,你可以这样做…或者如果你可以用一个不那么安全的方法,你也可以这样做。

>>> # establish a ssh tunnel
>>> from pathos.core import connect
>>> tunnel = connect('remote.computer.com', port=1234)
>>> tunnel       
Tunnel('-q -N -L55774:remote.computer.com:1234 remote.computer.com')
>>> tunnel._lport
55774
>>> tunnel._rport
1234
>>> 
>>> # define some function to run in parallel
>>> def sleepy_squared(x):
...   from time import sleep
...   sleep(1.0)
...   return x**2
... 
>>> # build a pool of servers and execute the parallel map
>>> from pathos.pp import ParallelPythonPool as Pool
>>> p = Pool(8, servers=('localhost:55774',))
>>> p.servers
('localhost:55774',)
>>> y = p.map(sleepy_squared, x)
>>> y
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

或者,您可以配置直接连接(无ssh)

>>> p = Pool(8, servers=('remote.computer.com:5678',))
# use an asynchronous parallel map
>>> res = p.amap(sleepy_squared, x)
>>> res.get()
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

这有点难,要让远程服务器工作,您必须事先在指定端口的remote.computer.com上启动一个运行的服务器,并且必须确保本地主机和远程主机上的设置都允许直接连接或ssh隧道连接。另外,您需要在每个主机上运行相同版本的pathospp分叉。另外,对于ssh,您需要运行ssh代理以允许使用ssh进行无密码登录。

但是,如果您的函数代码可以通过dill.source.importable传输到远程主机,那么希望这一切都能正常工作。

仅供参考,pathos早就应该发布了,而且基本上,在新的稳定版本被剪切之前,有一些bug和接口更改需要解决。

我建议你看看Ray,它的目的正是要做到这一点。

Ray在单机多核设置中使用与在分布式设置中相同的语法并行化代码。如果您愿意使用for循环而不是map调用,那么您的示例如下所示。

import ray
import time

ray.init()

@ray.remote
def function(x):
    time.sleep(0.1)
    return x

arglist = [1, 2, 3, 4]

result_ids = [function.remote(x) for x in arglist]
resultlist = ray.get(result_ids)

它将使用本地拥有的多个内核并行运行四个任务。要在集群上运行相同的示例,唯一会更改的行是对ray.init()的调用。相关文档可以在here中找到。

注意,我正在帮助开发Ray

在这里的聚会有点晚了,但由于我也在寻找类似的解决方案,而且这个问题仍然没有被标记为答案,我想我会贡献我的发现。

我最后用了SCOOP。它提供了一个可以跨多个核心、跨多个主机工作的并行映射实现。如果在调用期间需要的话,它还可以返回到Python的串行map函数。

从独家新闻的介绍页面,它引用了以下特点:

SCOOP features and advantages over futures, multiprocessing and similar modules are as follows:

  • Harness the power of multiple computers over network;
  • Ability to spawn multiple tasks inside a task;
  • API compatible with PEP-3148;
  • Parallelizing serial code with only minor modifications;
  • Efficient load-balancing.

它确实有一些怪癖(函数/类必须是可pickle的),如果它们不共享相同的文件系统架构,那么在多个主机上顺利运行的设置可能会很乏味,但总的来说,我对结果很满意。就我们的目的而言,做了相当多的Numpy&Cython,它提供了出色的性能。

希望这有帮助。

相关问题 更多 >

    热门问题