无法在Python3的子进程中使用distribute LocalCluster

2024-04-24 03:00:30 发布

您现在位置:Python中文网/ 问答频道 /正文

在使用python3的子进程中使用distribute的LocalCluster时出现了一个错误(python2可以正常工作)。下面是一个简单的示例(我使用的是Python3.6、distributed 1.23.3和tornado 5.1.1):

import multiprocessing

from distributed import LocalCluster
from distributed import Client



def call_client(cluster_address):
    with Client(cluster_address):
        pass


def main():
    cluster = LocalCluster(n_workers=2)
    print(cluster.workers)

    process = multiprocessing.Process(
        target=call_client, args=(cluster.scheduler.address, )
    )
    process.start()
    process.join()


if __name__ == "__main__":
    main()

执行文件时,我收到以下错误消息:

user@9b97e84a3c58:/workspace$ python test.py
[<Nanny: tcp://127.0.0.1:35779, threads: 2>, <Nanny: tcp://127.0.0.1:40211, threads: 2>]
Process Process-3:
Traceback (most recent call last):
  File "/usr/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/usr/lib/python3.6/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "test.py", line 10, in call_client
    with Client(cluster_address):
  File "/home/user/venv/lib/python3.6/site-packages/distributed/client.py", line 610, in __init__
    self.start(timeout=timeout)
  File "/home/user/venv/lib/python3.6/site-packages/distributed/client.py", line 733, in start
    sync(self.loop, self._start, **kwargs)
  File "/home/user/venv/lib/python3.6/site-packages/distributed/utils.py", line 277, in sync
    six.reraise(*error[0])
  File "/home/user/venv/lib/python3.6/site-packages/six.py", line 693, in reraise
    raise value
  File "/home/user/venv/lib/python3.6/site-packages/distributed/utils.py", line 262, in f
    result[0] = yield future
  File "/home/user/venv/lib/python3.6/site-packages/tornado/gen.py", line 1133, in run
    value = future.result()
  File "/home/user/venv/lib/python3.6/site-packages/tornado/gen.py", line 1141, in run
    yielded = self.gen.throw(*exc_info)
  File "/home/user/venv/lib/python3.6/site-packages/distributed/client.py", line 821, in _start
    yield self._ensure_connected(timeout=timeout)
  File "/home/user/venv/lib/python3.6/site-packages/tornado/gen.py", line 1133, in run
    value = future.result()
  File "/home/user/venv/lib/python3.6/site-packages/tornado/gen.py", line 1141, in run
    yielded = self.gen.throw(*exc_info)
  File "/home/user/venv/lib/python3.6/site-packages/distributed/client.py", line 862, in _ensure_connected
    self._update_scheduler_info())
  File "/home/user/venv/lib/python3.6/site-packages/tornado/gen.py", line 1133, in run
    value = future.result()
tornado.util.TimeoutError: Timeout

Tags: runinpyselfclienthomevenvlib
2条回答

使用spawn似乎是可行的。我怀疑有一个州的财政状况不好。你知道吗

process = multiprocessing.get_context('spawn').Process(...)

因为我最初的问题是在flask应用程序中启动子进程,所以我不能像MRocklin在另一个答案中建议的那样使用'spawn'。我现在的工作方案是,我不在主进程中调用cluster = LocalCluster(n_workers=2),而是在子进程中启动它:

import sys
import multiprocessing
import signal
from functools import partial

from distributed import LocalCluster
from distributed import Client


def _stop_cluster(cluster, *args):
    cluster.close()
    sys.exit(0)


def _start_local_cluster(q, n_workers):
    cluster = LocalCluster(n_workers=n_workers)
    q.put(cluster.scheduler.address)

    # shut down cluster when process is terminated
    signal.signal(signal.SIGTERM, partial(_stop_cluster, cluster))
    # run forever
    signal.pause()


def call_client(cluster_address):
    with Client(cluster_address):
        print("I am working")


def main():
    q = multiprocessing.Queue()
    p_dask = multiprocessing.Process(target=_start_local_cluster, args=(q, 2))
    p_dask.start()
    cluster_address = q.get()

    process = multiprocessing.Process(
        target=call_client, args=(cluster_address, )
    )
    process.start()
    process.join()

    p_dask.terminate()


if __name__ == "__main__":
    main()

相关问题 更多 >