使用分区处理pyspark中的嵌套json文件

2024-06-12 05:38:55 发布

您现在位置:Python中文网/ 问答频道 /正文

我将dict数据嵌套在一个json文件中,该文件存储在hdfs中(每天的数据为20年), 我想在pyspark中处理这个数据,日期按列划分。 然后把它放在蜂箱的桌子上

我试图首先分解嵌套以获得扁平结构,但还没有弄清楚如何按日期进行划分,因为它嵌套、重复和动态。我的方法应该是什么?

df = spark.read.json('hdfs://namenode:9000/forex/forex_rates.json')
dfRates = df.select(explode(array(df['rates']))).toDF("rates")
dfdate=dfRates.select("rates.2018-02-22.NZD")

# Drop the duplicated rows based on the base and date columns
forex_rates = df.select('Date', 'base', 'rates_BGN', 
'rates_CNY', 'rates_NZD').dropDuplicates(['base', 'Date']).fillna(0, subset= 
['BGN', 'CNY', 'NZD'])

# Export the dataframe into the Hive table forex_rates
forex_rates.write.mode("append").insertInto("forex_rates")

先谢谢你

sample data:
    
           {'rates': {
                        '2018-01-22': {'BGN': 1.9558, 'TRY': 4.6552, 'CNY': 7.8374, 'NOK': 9.6223, 'NZD': 1.6758}, 
                        '2018-01-09': {'BGN': 1.8558, 'TRY': 4.4843, 'CNY': 7.7865, 'NOK': 9.6715, 'NZD': 1.6601}
                      }, 
            'start_at': '2018-01-01', 
            'base': 'EUR', 
            'end_at': '2018-02-01'
           }

expected df structure:
            
+------------+------+-----------+-----------+-----------+
| Date       | Base | rates_BGN | rates_CNY | rates_NZD |
+------------+------+-----------+-----------+-----------+
| 2018-01-22 | EUR  | 1.9558    | 4.6552    | 7.8374    |
+------------+------+-----------+-----------+-----------+
| 2018-01-09 | EUR  | 1.8558    | 4.4843    | 7.7865    |
+------------+------+-----------+-----------+-----------+
| .......... | ...  | ......    | .....     | ......    |
+------------+------+-----------+-----------+-----------+

Tags: 文件the数据jsondfbasedatehdfs
1条回答
网友
1楼 · 发布于 2024-06-12 05:38:55

这段代码可能会对您有所帮助, 输入JSON

   {'rates': {
                '2018-01-22': {'BGN': 1.9558, 'TRY': 4.6552, 'CNY': 7.8374, 'NOK': 9.6223, 'NZD': 1.6758}, 
                '2018-01-09': {'BGN': 1.9558, 'TRY': 4.4843, 'CNY': 7.7865, 'NOK': 9.6715, 'NZD': 1.6601}
              }, 
    'start_at': '2018-01-01', 
    'base': 'EUR', 
    'end_at': '2018-02-01'
   }

代码

from pyspark.sql.functions import *

df=spark.read.option("multiline","true").json("file:///home/sathya/test-datasets/forex_rates.json")

base=df.select("base").rdd.collect()[0].asDict()["base"]
start_at=df.select("start_at").rdd.collect()[0].asDict()["start_at"]
end_at=df.select("end_at").rdd.collect()[0].asDict()["end_at"]

df2=df.select("rates.*")
#python 2
stack_characteristics = str(len(df2.columns))+','+','.join(["'{}',`{}`".format(v,v) for v in df2.columns])

df2.select(expr('''stack({})'''.format(stack_characteristics)).alias('date','vals')).select('date', 'vals.*').withColumn("base",lit(base)).withColumn("start_at",lit(start_at)).withColumn("end_at",lit(end_at)).show()

#python3
#stack_characteristics = str(len(df.columns))+','+','.join([f"'{v}',`{v}`" for v in df.columns])

#df.select(expr(f'''stack({stack_characteristics})''').alias('date','vals')).select('date', 'vals.*').withColumn("base",lit(base)).withColumn("start_at",lit(start_at)).withColumn("end_at",lit(end_at)).show()
'''
+     +   +   +   +   +   +  +     +     +
|      date|   BGN|   CNY|   NOK|   NZD|   TRY|base|  start_at|    end_at|
+     +   +   +   +   +   +  +     +     +
|2018-01-09|1.9558|7.7865|9.6715|1.6601|4.4843| EUR|2018-01-01|2018-02-01|
|2018-01-22|1.9558|7.8374|9.6223|1.6758|4.6552| EUR|2018-01-01|2018-02-01|
+     +   +   +   +   +   +  +     +     +
'''

相关问题 更多 >