翻译Scala代码以重命名和移动CSV文件Spark PySp

2024-03-29 15:53:53 发布

您现在位置:Python中文网/ 问答频道 /正文

我使用下面的Scala代码将CSV文件重命名为TXT文件并移动TXT文件。我需要将这段代码翻译成Python/Pyspark,但是我遇到了一些问题(不精通Python)。我将非常感谢你的帮助。提前谢谢!你知道吗

//Prepare to rename file
import org.apache.hadoop.fs._
import org.apache.hadoop.fs.{FileSystem, Path}
val fs = FileSystem.get(sc.hadoopConfiguration)

//Create variables
val table_name = dbutils.widgets.get("table_name") // getting table name
val filePath = "dbfs:/mnt/datalake/" + table_name + "/" // path where original csv file name is located
val fileName = fs.globStatus(new Path(filePath+"part*"))(0).getPath.getName // getting original csv file name
val newfilename = table_name + ".txt" // renaming and transforming csv into txt
val curatedfilePath = "dbfs:/mnt/datalake/" + newfilename // curated path + new file name

//Move to curated folder
dbutils.fs.mv(filePath + fileName, curatedfilePath)

下面是Python代码

%python

#Create variables
table_name = dbutils.widgets.get("table_name") # getting table name
filePath = "dbfs:/mnt/datalake/" + table_name + "/" # path where original csv file name is located
newfilename = table_name + ".txt" # transforming csv into txt
curatedfilePath = "dbfs:/mnt/datalake/" + newfilename # curated path + new file name

#Save CSV file
df_curated.coalesce(1).replace("", None).write.mode("overwrite").save(filePath,format='csv', delimiter='|', header=True, nullValue=None)

# getting original csv file name
for f in filePath:
            if f[1].startswith("part-00000"): 
                 original_file_name = f[1]

#move to curated folder
dbutils.fs.mv(filePath + fileName, curatedfilePath)

我对“获取原始文件名”部分有问题。它抛出以下错误:

IndexError: string index out of range
---------------------------------------------------------------------------
IndexError                                Traceback (most recent call last)
<command-3442953727364942> in <module>()
     11 # getting original csv file name
     12 for f in filePath:
---> 13             if f[1].startswith("part-00000"):
     14                  original_file_name = f[1]
     15 

IndexError: string index out of range

Tags: csvpathnametablevalfsfileoriginal
1条回答
网友
1楼 · 发布于 2024-03-29 15:53:53

在Scala代码中,使用^{}列出保存数据帧的文件夹中的部件文件。你知道吗

在Python中,您也可以通过JVM访问hadoop.fs,如下所示:

conf = sc._jsc.hadoopConfiguration()
Path = sc._gateway.jvm.org.apache.hadoop.fs.Path

part_files = Path(filePath).getFileSystem(conf).globStatus(Path(filePath + "/part*"))
file_name = part_files[0].getPath().getName()

相关问题 更多 >