如何根据Python中json文件的最新时间戳条件创建和写入文件?

2024-04-23 08:17:06 发布

您现在位置:Python中文网/ 问答频道 /正文

这是我的大json文件的一部分。我正在用yamlPath的内容创建文件夹,然后用sqlQuery的内容写入文件。但是在重复“yamlName+yamlPath”的情况下,我的代码只创建了第一个结果的文件,并插入了它从for循环获得的sqlQuery内容。因此,在这个重复的场景中,我只想选取sqlQuery,并基于最新的/max jobEndTimestamp创建一个文件。你知道吗

如何使代码只获取最新的jobEndTimestamp并创建文件?你知道吗

预期产量:

/app/computer/users/ship-notice-data.sql   -> select from table.b
/app/computer/a/users/boat-notice-data.sql -> select from table.b
{
"stream": [
{
  "applicationServiceId": "uhhj",
  "yamlName": "/users/ship-notice-data.yml",
  "yamlPath": "/app/computer",
  "jobStartTimestamp": "2018-09-15 04:12:46",
  "jobEndTimestamp": "2018-09-15 04:15:29",
  "sourceHostName": "Teradata",
  "sourceType": "Teradata",
  "targetHostName": "DB2",
  "targetType": "DB2",
  "sqlQuery": "select from table.a"
},{
  "applicationServiceId": "uhhj",
  "yamlName": "/users/ship-notice-data.yml",
  "yamlPath": "/app/computer",
  "jobStartTimestamp": "2018-09-15 21:12:46",
  "jobEndTimestamp": "2018-09-15 21:15:29",
  "sourceHostName": "Teradata",
  "sourceType": "Teradata",
  "targetHostName": "DB2",
  "targetType": "DB2",
  "sqlQuery": "select from table.b"
},{
  "applicationServiceId": "uhhj",
  "yamlName": "/users/car-notice-data.yaml",
  "yamlPath": "/app/computer/s",
  "jobStartTimestamp": "2018-09-15 04:12:46",
  "jobEndTimestamp": "2018-09-15 06:15:29",
  "sourceHostName": "Teradata",
  "sourceType": "Teradata",
  "targetHostName": "DB2",
  "targetType": "DB2",
  "sqlQuery": "select from table.b"
},{
  "applicationServiceId": "uhhj",
  "yamlName": "/users/boat-notice-data.yaml",
  "yamlPath": "/app/computer/a",
  "jobStartTimestamp": "2018-09-15 04:12:46",
  "jobEndTimestamp": "2018-09-15 06:15:29",
  "sourceHostName": "Teradata",
  "sourceType": "Teradata",
  "targetHostName": "DB2",
  "targetType": "DB2",
  "sqlQuery": "select from table.b"
}
]
}

这是我的密码:

with open('/Users/mona/stream.json', 'r') as f:
    item_dict = json.load(f)

for item in item_dict['stream']:
    if (item['applicationServiceId'] == 'uhhj' and 
            item.get('targetHostName') == 'DB2' and
            (item['targetType'] == 'DB2')):
        # print(item.get('applicationServiceId'))
        v3 = item.get('applicationServiceId')
        v4 = item.get('jobEndTimestamp')
        v = item.get('sqlQuery')
        v1 = item.get('yamlName')
        v2 = item.get('yamlPath')
        print(v1+v2+" "+v4+ " " +str(v))
        # v4 = str(item.get('yamlName').split('/')[-1].split('.')[0])
        # print(v4)

        originalPath = "/Users/mona/"

        fullPath = os.path.join(originalPath+v2+(v1.split('/')[1].split('/')[0])+'/'+(v1.split('/')[2].split('/')[0])+'/')
        # print(fullPath)
        os.makedirs(fullPath, mode=0o777, exist_ok=True)
        # print(v1)

        with open(fullPath + str(item.get('yamlName').split('/')[-1].split('.')[0]) + ".sql", "w") as newFile:
            newFile.write("%s \n" % (v))

Tags: appdatagetitemselectuserscomputernotice
2条回答

也许这个解决方案是次优的,但它可以工作。。。你知道吗

其思想是按yamlPath对字典进行分组,每个路径都包含一个字典列表,其中yamlPath。你知道吗

然后,您拆分并重新连接路径,并保存名称(这样它就可以与您的操作系统兼容),最后,我只是打开上下文,以便您可以附加到刚刚打开的文件(如果不存在则创建,如果存在则打开并附加),这就是'a'参数的原因。你知道吗

from collections import defaultdic
import os
objects = defaultdic(list)

for item in item_dict['stream']:
    objects[item.yamlPath].append(item)
for yamlPath in objects.keys():
    for item in yamlPath:
        temp = yamlPath.split("/") + item.yamlName.split("/")[:-1]
        name = item.yamlName.split("/")[-1]
        file_with_path = os.path.join(*temp)
        with open(os.path.join(*temp, name), 'a') as file:
            pass # Do what ever to the content

同样,这只是一个想法,它可能会起作用,(还没有尝试过)并且您在创建dir时遇到问题,但是如果问题仍然是时间戳,那么遵循与此相同的逻辑,按照您想要的时间戳对它们进行分组,然后只需迭代max(objects.keys())(假设这些日期是可比较的,如果不只是强制转换它们的话)

要使用pandas实现这一点,您可以使用数据创建一个dataframe,通过它添加一个名为fullPath的新列group,然后过滤到maxjobEndTimestamp。像这样:

import pandas as pd

with open('/Users/mona/stream.json', 'r') as f:
    item_dict = json.load(f.read())

df = pd.DataFrame(item_dict['sources'])
df['fullPath'] = df['yamlPath'] + df['yamlName']
grouped = df.groupby('fullPath').apply(lambda d: d[d['jobEndTimestamp'] == d['jobEndTimestamp'].max()])

print grouped['sqlQuery']

将屈服

fullPath
/app/computer/a/users/boat-notice-data.yaml  3    select from table.b
/app/computer/s/users/car-notice-data.yaml   2    select from table.b
/app/computer/users/ship-notice-data.yml     1    select from table.b
Name: sqlQuery, dtype: object

相关问题 更多 >