PythonSpark:需要从文件列执行配置单元查询

2024-06-16 19:08:13 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个包含如下行的文件(文件名:sample.csv

Id,Query
T1012,"Select * from employee_dim limit 100"
T1212,"Select * from department_dim limit 100"
T1231,"Select dept_number,location,dept_name from locations"

我需要遍历这个文件(sample.csv)并获取第二列(“query”),在配置单元数据库中运行它并获得结果,然后将其保存到一个名为T1012_result.csv的新文件中,并对所有行执行同样的操作。在

你能帮忙吗?在

我尝试通过spark读取文件并将其转换为一个列表,然后使用sparksession执行SQL查询,但sparksession不起作用。在

^{pr2}$

Tags: 文件csvsamplefromid文件名employeequery
1条回答
网友
1楼 · 发布于 2024-06-16 19:08:13

更新:spark

file_path="file:///user/vikrant/inputfiles/multiquery.csv"
df=spark.read.format("com.databricks.spark.csv").option("header", "true").load(file_path)

+ -+               -+
|id |query                          |
+ -+               -+
|1  |select * from exampledate      |
|2  |select * from test             |
|3  |select * from newpartitiontable|
+ -+               -+

def customFunction(row):
    for row in df.rdd.collect():
        item=(row[1])
        filename=(row[0])
        query=""
        query+=str(item)
        newdf=spark.sql(query)
        savedataframe(newdf,filename)

def savedataframe(newdf,filename):
    newdf.coalesce(1).write.csv("/user/dev/hadoop/external/files/file_" + filename + ".csv")

customFunction(df)

drwxr-xr-x   - vikct001 hdfs          0 2019-08-02 11:49 /user/dev/hadoop/external/files/file_1.csv
drwxr-xr-x   - vikct001 hdfs          0 2019-08-02 11:49 /user/dev/hadoop/external/files/file_2.csv
drwxr-xr-x   - vikct001 hdfs          0 2019-08-02 11:49 /user/dev/hadoop/external/files/file_3.csv

更新:使用pandas 我在sql server上有几个测试表,正如您在问题中提到的,我正在将它们读到pandas dataframe,并将把查询结果保存到每个不同的文件中,并将重命名为dataframe的第一列:

^{pr2}$

输出文件名为:

outfile1.txt这将有表User_Stage_Table的数据

outfile2.txt这将有表User_temp_Table'的数据

告诉我这是否解决了你的问题或面临任何进一步的问题。在

相关问题 更多 >