对此有何解释pyarrow.lib.arrowio错误:HDFS file does not exist”尝试使用Dask读取HDFS中的文件时出错?

2024-06-16 11:45:37 发布

您现在位置:Python中文网/ 问答频道 /正文

我使用的是分布式的Dask,我试图从HDFS中存储的CSV创建一个数据帧。 我想连接到HDFS是成功的,因为我能够打印dataframe列的名称。 但是,当我试图在dataframe上使用len函数或任何其他函数时,会出现以下错误:

pyarrow.lib.ArrowIOError: HDFS file does not exist: /user/F43479/trip_data_v2.csv

我不明白我为什么会犯这个错误。我想听听你的意见。在

这是我的代码:

^{pr2}$

以下是我的HDFS存储库的内容:

[F43479@xxxxx dask_tests]$ hdfs dfs -ls /user/F43479/
Found 9 items
-rw-r-----   3 F43479 hdfs            0 2019-03-07 16:42 /user/F43479/-
drwx------   - F43479 hdfs            0 2019-04-03 02:00 /user/F43479/.Trash
drwxr-x---   - F43479 hdfs            0 2019-03-13 16:53 /user/F43479/.hiveJars
drwxr-x---   - F43479 hdfs            0 2019-03-13 16:52 /user/F43479/hive
drwxr-x---   - F43479 hdfs            0 2019-03-15 13:23 /user/F43479/nyctaxi_trip_data
-rw-r-----   3 F43479 hdfs           36 2019-04-15 11:13 /user/F43479/test.csv
-rw-r-----   3 F43479 hdfs  50486731416 2019-03-26 17:37 /user/F43479/trip_data.csv
-rw-r-----   3 F43479 hdfs   5097056230 2019-04-15 13:57 /user/F43479/trip_data_v2.csv
-rw-r-----   3 F43479 hdfs 504867312828 2019-04-02 11:15 /user/F43479/trip_data_x10.csv

最后,代码执行的完整结果:

Index(['vendor_id', 'passenger_count', 'trip_time_in_secs', 'trip_distance'], dtype='object')
Traceback (most recent call last):
  File "dask_pa_hdfs.py", line 32, in <module>
    print(len(df))
  File "/opt/anaconda3/envs/python3-dask/lib/python3.7/site-packages/dask/dataframe/core.py", line 438, in __len__
    split_every=False).compute()
  File "/opt/anaconda3/envs/python3-dask/lib/python3.7/site-packages/dask/base.py", line 156, in compute
    (result,) = compute(self, traverse=False, **kwargs)
  File "/opt/anaconda3/envs/python3-dask/lib/python3.7/site-packages/dask/base.py", line 397, in compute
    results = schedule(dsk, keys, **kwargs)
  File "/opt/anaconda3/envs/python3-dask/lib/python3.7/site-packages/distributed/client.py", line 2321, in get
    direct=direct)
  File "/opt/anaconda3/envs/python3-dask/lib/python3.7/site-packages/distributed/client.py", line 1655, in gather
    asynchronous=asynchronous)
  File "/opt/anaconda3/envs/python3-dask/lib/python3.7/site-packages/distributed/client.py", line 673, in sync
    return sync(self.loop, func, *args, **kwargs)
  File "/opt/anaconda3/envs/python3-dask/lib/python3.7/site-packages/distributed/utils.py", line 277, in sync
    six.reraise(*error[0])
  File "/opt/anaconda3/envs/python3-dask/lib/python3.7/site-packages/six.py", line 693, in reraise
    raise value
  File "/opt/anaconda3/envs/python3-dask/lib/python3.7/site-packages/distributed/utils.py", line 262, in f
    result[0] = yield future
  File "/opt/anaconda3/envs/python3-dask/lib/python3.7/site-packages/tornado/gen.py", line 1133, in run
    value = future.result()
  File "/opt/anaconda3/envs/python3-dask/lib/python3.7/site-packages/tornado/gen.py", line 1141, in run
    yielded = self.gen.throw(*exc_info)
  File "/opt/anaconda3/envs/python3-dask/lib/python3.7/site-packages/distributed/client.py", line 1500, in _gather
    traceback)
  File "/opt/anaconda3/envs/python3-dask/lib/python3.7/site-packages/six.py", line 692, in reraise
    raise value.with_traceback(tb)
  File "/opt/anaconda3/envs/python3-dask/lib/python3.7/site-packages/dask/bytes/core.py", line 133, in read_block_from_file
    with copy.copy(lazy_file) as f:
  File "/opt/anaconda3/envs/python3-dask/lib/python3.7/site-packages/dask/bytes/core.py", line 177, in __enter__
    f = SeekableFile(self.fs.open(self.path, mode=mode))
  File "/opt/anaconda3/envs/python3-dask/lib/python3.7/site-packages/dask/bytes/pyarrow.py", line 37, in open
    return self.fs.open(path, mode=mode, **kwargs)
  File "pyarrow/io-hdfs.pxi", line 431, in pyarrow.lib.HadoopFileSystem.open
  File "pyarrow/error.pxi", line 83, in pyarrow.lib.check_status
pyarrow.lib.ArrowIOError: HDFS file does not exist: /user/F43479/trip_data_v2.csv

Tags: inpylibpackageslinesitehdfspython3
2条回答

您已经在本地进程中仔细地设置了环境,包括客户机,以便它可以与HDFS通信。对于查找列,这就足够了,因为Dask会从客户机进程和前几行数据中预先执行此操作。但是:

client = Client('10.22.104.37:8786')

您的调度程序和工作人员住在其他地方,并且没有您提供给他们的环境变量。运行任务时,工作人员不知道如何查找文件。在

你需要做的是给工人们设置环境。这可以在发射前完成,也可以在发射后完成:

^{pr2}$

(应该从每个worker返回一组None

请注意,如果新员工动态在线,他们将需要在访问HDFS之前运行此函数。在

我解决了问题。它与访问HDFS的权限有关。我在一个基于Kerberised的HDFS集群上工作,并启动了边缘节点上的Dask调度程序进程,以及数据节点上的工作进程。
要访问HDF,pyarrow需要两件事:

  • 它必须安装在调度程序和所有工人上
  • 还需要在所有节点上配置环境变量

然后要访问HDFS,需要通过Kerberos对启动的进程进行身份验证。当从调度程序进程启动代码时,我能够连接到HDFS,因为我的会话是通过Kerberos进行身份验证的。这就是为什么我能得到有关CSV文件列的信息。
但是,由于工作进程没有经过身份验证,它们无法访问HDFS,这导致了错误。为了解决这个问题,我们必须停止工作进程,修改用于启动它们的脚本,以便它包含一个kerberos命令来对HDFS进行身份验证(kinit something),然后重新启动工作进程。
目前还可以,但这意味着Dask与Kerberised集群不兼容。使用我们所做的配置,当从worker启动计算时,所有用户对HDFS具有相同的权限。我认为这不是一个完全安全的做法

相关问题 更多 >