读取5GB的Json文件,每行包含相似的json对象,并将其拆分为多个json文件
我有一个包含多个json对象的json文件。我们需要把这个文件拆分成多个格式正确的json文件,并且可以设置每个文件的大小。
我尝试了下面的代码,但获取文件中记录数量的时间太长了。
每个文件的处理总时间大约是18分钟。我需要缩短这个时间。以下是示例数据:
输入:
{"job": "developer"}
{"job": "taxi driver"}
{"job": "police"}
输出:
{
"ROOT": [
{
"job": "developer"
},
{
"job": "taxi driver"
},
{
"job": "police"
}
我尝试的代码,但获取记录长度的时间太长了:
import os
import json
import glob
import time
import shutil
start = time.time()
def filesplit(fInputname,farchivedir,foutdir):
File_Extension = '.json'
# using partition()
# String till Substring
x=foutdir.find(File_Extension)
res=foutdir[0:x]
with open(fInputname, 'r', encoding='utf-8') as f1:
ll = [json.loads(line.strip()) for line in f1.readlines()]
#Total number of records in the input json file
print(len(ll))
#50000 means we getting splits of 50000 json objects
size_of_the_split=50000
total = len(ll) // size_of_the_split
#Number of files getting generated
print(total+1)
for i in range(total+1):
jsonData=ll[i * size_of_the_split:(i + 1) * size_of_the_split]
json.dump( {'ROOT': jsonData}, open(res + "_" + str(i+1) + ".json", 'w',encoding='utf8'), ensure_ascii=False, indent=True)
shutil.move(fInputname,farchivedir)
for name in glob.glob("C:\\Users\\JSON\\Input\\*.json"):
print(name)
filesplit(name, name.replace('C:\\Users\\JSON\\Input','C:\\Users\\JSON\\OriginalFiles_BKP'),name.replace('C:\\Users\\JSON\\Input','C:\\Users\\JSON\\Output'))
end = time.time()
print('completed')
print("The time of execution of above program is :",
(end-start) * 10**3, "ms")
2 个回答
-1
不要一次性把所有数据都读到内存里。可以分块处理文件,使用 itertools.islice
来处理文件迭代器:
import itertools
import json
import pathlib
def file_split(
json_lines: pathlib.Path, output_dir: pathlib.Path, chunksize: int = 50000
) -> None:
"""
Splits a newline-delimited JSON file into smaller chunks and saves them in the specified output directory.
Creates a subdirectory with the same name as the `json_lines` file in `output_dir`
Args:
json_lines (pathlib.Path): Path to the newline-delimited JSON file to split.
output_dir (pathlib.Path): Directory where the split JSON files will be saved.
chunksize (int, optional): Size of each chunk in terms of lines. Defaults to 50,000.
Returns:
None
"""
with json_lines.open("r", encoding="utf-8") as f_in:
chunked = itertools.islice(f_in, start=0, stop=None, step=chunksize)
for i, chunk in enumerate(chunked, start=1):
data = list(map(json.loads, chunk))
name = pathlib.Path(json_lines).with_suffix("").name
with (output_dir / f"{name}_{i}.json").open("w", encoding="utf8") as f_out:
json.dump({"ROOT": data}, f_out, ensure_ascii=False, indent=True)
你可以这样使用它(注意,我把归档的逻辑移出了这个函数):
import shutil
output_dir = pathlib.Path('C:\\Users\\JSON\\Output')
archive_dir = pathlib.Path('C:\\Users\\JSON\\OriginalFiles_BKP')
for file in pathlib.Path("C:\\Users\\JSON\\Input\\").glob("*.json"):
file_split(file, output_dir)
shutil.move(file, archive_dir)
另外,处理路径时建议使用 pathlib
。
0
我觉得可以试着绕过json这个包,把所有的数据行都加载到内存里,然后手动制作结果文件。
可能可以这样做:
MAX_ROWS_PER_FILE = 2
NEW_LINE = "\n"
OPENER = f"{{{NEW_LINE} \"ROOT\": ["
CLOSER = f"{NEW_LINE} ]{NEW_LINE}}}"
file_out = None
with open("file_test.json_records", encoding="utf-8", newline="") as filein:
for index, row in enumerate(filein):
if index % MAX_ROWS_PER_FILE == 0:
is_first_row = True
if file_out:
file_out.write(CLOSER)
file_out.close()
file_out = open(f"file_test_{index // MAX_ROWS_PER_FILE}.json", "w", encoding="utf-8", newline="")
file_out.write(OPENER)
if is_first_row:
file_out.write(f"{NEW_LINE} {row.strip()}")
is_first_row = False
else:
file_out.write(f"{NEW_LINE} ,{row.strip()}")
file_out.write(CLOSER)
file_out.close()
我认为给定的file_test.json_records内容是:
{"job": "developer"}
{"job": "taxi driver"}
{"job": "police"}
这样应该能生成两个文件:
file_test_0.json
{
"ROOT": [
{"job": "developer"}
,{"job": "taxi driver"}
]
}
还有
file_test_1.json
{
"ROOT": [
{"job": "police"}
]
}