Pyspark无法从pathlib对象加载

2024-06-16 08:33:00 发布

您现在位置:Python中文网/ 问答频道 /正文

Python Version 3.7.5
Spark Version 3.0
Databricks Runtime 7.3

我目前正在使用datalake文件系统中的路径

这是

p = dbutils.fs.ls('dbfs:/databricks-datasets/nyctaxi')
print(p)
 [FileInfo(path='dbfs:/databricks-datasets/nyctaxi/readme_nyctaxi.txt', name='readme_nyctaxi.txt', size=916),
 FileInfo(path='dbfs:/databricks-datasets/nyctaxi/reference/', name='reference/', size=0),
 FileInfo(path='dbfs:/databricks-datasets/nyctaxi/taxizone/', name='taxizone/', size=0),
 FileInfo(path='dbfs:/databricks-datasets/nyctaxi/tripdata/', name='tripdata/', size=0)]

现在,为了将其转换为有效的Pathlib Posix对象,我通过一个函数传递它

def create_valid_path(paths):
    return Path('/dbfs').joinpath(*[part for part in Path(paths).parts[1:]])

tripdata的输出为

PosixPath('/dbfs/databricks-datasets/nyctaxi/tripdata')

现在,如果我想在将CSV的子集收集到一个列表中之后将其读入sparkdata框架中

from pyspark.sql.functions import * 
df = spark.read.format('csv').load(paths)

这是回报

AttributeError: 'PosixPath' object has no attribute '_get_object_id'

现在,我唯一能做到这一点的方法是手动在路径dbfs:/..前面加上前缀,并将每个项返回到一个字符串,但是需要使用Pathlib来执行一些基本的I/O操作。我是否缺少一些简单的东西,或者Pyspark不能读取pathlib对象

例如

trip_paths_str = [str(Path('dbfs:').joinpath(*part.parts[2:])) for part in trip_paths]

print(trip_paths_str)

['dbfs:/databricks-datasets/nyctaxi/tripdata/fhv/fhv_tripdata_2015-01.csv.gz',
 'dbfs:/databricks-datasets/nyctaxi/tripdata/fhv/fhv_tripdata_2015-02.csv.gz'...]

Tags: csvpathnamesizedatasetspathstripdatapart
1条回答
网友
1楼 · 发布于 2024-06-16 08:33:00

那么改做这个怎么样

from pyspark.sql.functions import * 
import os

def db_list_files(file_path):
  file_list = [file.path for file in dbutils.fs.ls(file_path) if os.path.basename(file.path)]
  return file_list

files = db_list_files('dbfs:/FileStore/tables/')
 
df = spark.read.format('text').load(files)
df.show()

相关问题 更多 >