使用dask将数据集复制到所有工作线程

2024-04-25 08:13:10 发布

您现在位置:Python中文网/ 问答频道 /正文

我使用的dask和分布式调度程序。我正在尝试将一个通过s3上的csv读取的数据集复制到所有工作节点。示例:

from distributed import Executor
import dask.dataframe as dd

e= Executor('127.0.0.1:8786',set_as_default=True)
df = dd.read_csv('s3://bucket/file.csv', blocksize=None) 
df = e.persist(df)
e.replicate(df)

distributed.utils - ERROR - unhashable type: 'list'
Traceback (most recent call last):
  File "/root/.miniconda/envs/dask_env/lib/python3.5/site-packages/distributed/utils.py", line 102, in f
    result[0] = yield gen.maybe_future(func(*args, **kwargs))
  File "/root/.miniconda/envs/dask_env/lib/python3.5/site-packages/tornado/gen.py", line 1015, in run
    value = future.result()
  File "/root/.miniconda/envs/dask_env/lib/python3.5/site-packages/tornado/concurrent.py", line 237, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 3, in raise_exc_info
  File "/root/.miniconda/envs/dask_env/lib/python3.5/site-packages/tornado/gen.py", line 1021, in run
    yielded = self.gen.throw(*exc_info)
  File "/root/.miniconda/envs/dask_env/lib/python3.5/site-packages/distributed/executor.py", line 1347, in _replicate
    branching_factor=branching_factor)
  File "/root/.miniconda/envs/dask_env/lib/python3.5/site-packages/tornado/gen.py", line 1015, in run
    value = future.result()
  File "/root/.miniconda/envs/dask_env/lib/python3.5/site-packages/tornado/concurrent.py", line 237, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 3, in raise_exc_info
  File "/root/.miniconda/envs/dask_env/lib/python3.5/site-packages/tornado/gen.py", line 1021, in run
    yielded = self.gen.throw(*exc_info)
  File "/root/.miniconda/envs/dask_env/lib/python3.5/site-packages/distributed/core.py", line 444, in send_recv_from_rpc
    result = yield send_recv(stream=stream, op=key, **kwargs)
  File "/root/.miniconda/envs/dask_env/lib/python3.5/site-packages/tornado/gen.py", line 1015, in run
    value = future.result()
  File "/root/.miniconda/envs/dask_env/lib/python3.5/site-packages/tornado/concurrent.py", line 237, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 3, in raise_exc_info
  File "/root/.miniconda/envs/dask_env/lib/python3.5/site-packages/tornado/gen.py", line 1024, in run
    yielded = self.gen.send(value)
  File "/root/.miniconda/envs/dask_env/lib/python3.5/site-packages/distributed/core.py", line 345, in send_recv
    six.reraise(*clean_exception(**response))
  File "/root/.miniconda/envs/dask_env/lib/python3.5/site-packages/six.py", line 685, in reraise
    raise value.with_traceback(tb)
  File "/root/.miniconda/envs/dask_env/lib/python3.5/site-packages/distributed/core.py", line 211, in handle_stream
    result = yield gen.maybe_future(handler(stream, **msg))
  File "/root/.miniconda/envs/dask_env/lib/python3.5/site-packages/tornado/gen.py", line 1015, in run
    value = future.result()
  File "/root/.miniconda/envs/dask_env/lib/python3.5/site-packages/tornado/concurrent.py", line 237, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 3, in raise_exc_info
  File "/root/.miniconda/envs/dask_env/lib/python3.5/site-packages/tornado/gen.py", line 285, in wrapper
    yielded = next(result)
  File "/root/.miniconda/envs/dask_env/lib/python3.5/site-packages/distributed/scheduler.py", line 1324, in replicate
    keys = set(keys)
TypeError: unhashable type: 'list'

这是复制数据帧的正确方法吗?似乎^{{cd1>}返回的对象由于某种原因无法与^{cd2>}一起工作。


Tags: inpyinfoenvlibpackageslinesite