非常大文件的优化处理
我的任务相对简单:我需要对输入文件中的每一行进行测试,看看这行是否满足一组特定的条件。如果满足,就把该行的特定列写入一个新文件。我写了一个Python脚本来完成这个工作,但我希望能得到一些帮助,主要在以下三个方面:1)提高速度,2)如何更好地处理列名(因为不同文件的列数可能不同),3)如何更好地指定我的过滤条件和想要的输出列。
1)我处理的文件包含天文图像的光度数据。每个文件大约有100万行,150列浮点数,通常大小超过1GB。我有一个老旧的AWK脚本,可以在大约1分钟内处理这样的文件;而我的Python脚本需要5到7分钟。我经常需要调整过滤条件并多次运行,直到输出文件符合我的要求,所以速度确实很重要。我发现for循环的速度还不错;问题在于我在循环内部的处理方式拖慢了速度。使用itemgetter来选择我想要的列比把整行读入内存要快很多,但我不太确定还有什么方法可以进一步提高速度。能否让它和AWK一样快呢?
2)我希望能通过列名而不是列号来处理数据,因为某些特定数据(比如光子计数、背景、信噪比等)的列号在不同文件中可能会变化。在我的AWK脚本中,我总是需要检查条件和输出列的列号是否正确,即使过滤和输出应用于相同的数据。我在Python中的解决方案是创建一个字典,将每个数据项与列号对应。当文件的列不同的时候,我只需要指定一个新的字典。也许还有更好的方法?
3)理想情况下,我只需要在脚本的顶部指定输入和输出文件的名称、过滤条件和想要输出的列,这样我就不需要在代码中到处寻找来调整某些内容。我的主要问题是未定义的变量。例如,一个典型的条件是'SNR > 4',但'SNR'(信噪比)在开始读取光度文件的行之前并没有被赋值。我的解决方案是使用字符串和eval/exec的组合。再一次,也许还有更好的方法?
我并没有接受过计算机科学的训练(我是一名天文学的研究生)——我通常只是把东西拼凑在一起,然后调试直到它能工作。然而,针对我上面提到的三个方面进行优化对我的研究变得极其重要。抱歉发了这么长的帖子,但我觉得这些细节会很有帮助。任何建议,包括如何清理代码和编码风格,都会非常感激。
非常感谢,
Jake
#! /usr/bin/env python2.6
from operator import itemgetter
infile = 'ugc4305_1.phot'
outfile = 'ugc4305_1_filt.phot'
# names must belong to dicitonary
conditions = 'OBJ <= 2 and SNR1 > 4 and SNR2 > 4 and FLAG1 < 8 and FLAG2 < 8 and (SHARP1 + SHARP2)**2 < 0.075 and (CROWD1 + CROWD2) < 0.1'
input = 'OBJ, SNR1, SNR2, FLAG1, FLAG2, SHARP1, SHARP2, CROWD1, CROWD2'
# should contain all quantities used in conditions
output = 'X, Y, OBJ, COUNTS1, BG1, ACS1, ERR1, CHI1, SNR1, SHARP1, ROUND1, CROWD1, FLAG1, COUNTS2, BG2, ACS2, ERR2, CHI2, SNR2, SHARP2, ROUND2, CROWD2, FLAG2'
# dictionary of col. numbers for the more important qunatities
columns = dict(EXT=0, CHIP=1, X=2, Y=3, CHI_GL=4, SNR_GL=5, SHARP_GL=6, ROUND_GL=7, MAJAX_GL=8, CROWD_GL=9, OBJ=10, COUNTS1=11, BG1=12, ACS1=13, STD1=14, ERR1=15, CHI1=16, SNR1=17, SHARP1=18, ROUND1=19, CROWD1=20, FWHM1=21, ELLIP1=22, PSFA1=23, PSFB1=24, PSFC1=25, FLAG1=26, COUNTS2=27, BG2=28, ACS2=29, STD2=30, ERR2=31, CHI2=32, SNR2=33, SHARP2=34, ROUND2=35, CROWD2=36, FWHM2=37, ELLIP2=38, PSFA2=39, PSFB2=40, PSFC2=41, FLAG2=42)
f = open(infile)
g = open(outfile, 'w')
# make string that extracts values for testing
input_items = []
for i in input.replace(',', ' ').split():
input_items.append(columns[i])
input_items = ', '.join(str(i) for i in input_items)
var_assign = '%s = [eval(i) for i in itemgetter(%s)(line.split())]' % (input, input_items)
# make string that specifies values for writing
output_items = []
for i in output.replace(',', ' ').split():
output_items.append(columns[i])
output_items = ', '.join(str(i) for i in output_items)
output_values = 'itemgetter(%s)(line.split())' % output_items
# make string that specifies format for writing
string_format = []
for i in output.replace(',', ' ').split():
string_format.append('%s')
string_format = ' '.join(string_format)+'\n'
# main loop
for line in f:
exec(var_assign)
if eval(conditions):
g.write(string_format % tuple(eval(output_values)))
f.close()
g.close()
6 个回答
这是我处理这种情况的方法...
在我的电脑上,这个运行大约需要35秒,而你原来的脚本则需要大约3分钟。其实还可以再做一些优化(比如我们只需要把几个列转换成浮点数),不过这样也只能再减少几秒钟的运行时间。
你也可以很简单地使用 csv.DictReader
,就像很多人建议的那样。我没有使用它,因为你需要定义一个自定义的格式,而用我现在的方法只需要多写几行代码就能实现同样的功能。(csv
模块的各种类会检查一些更复杂的情况(比如带引号的字符串等),但在这个特定的情况下你并不需要担心这些。虽然在很多情况下它们非常有用,但在这里用起来有点多余。)
注意,你也可以在调用脚本时轻松地将输入文件和输出文件的名字作为参数传入,而不是直接写死在代码里(比如 infile = sys.argv[0]
等等)。这样也能让你更方便地输入或输出数据...(你可以检查 sys.argv
的长度,然后根据需要把 infile
或 outfile
设置为 sys.stdin
和/或 sys.stdout
)
def main():
infile = 'ugc4305_1.phot'
outfile = 'ugc4305_1_filt.phot'
process_data(infile, outfile)
def filter_conditions(row):
for key, value in row.iteritems():
row[key] = float(value)
cond = (row['OBJ'] <= 2 and row['SNR1'] > 4
and row['SNR2'] > 4 and row['FLAG1'] < 8
and row['FLAG2'] < 8
and (row['SHARP1'] + row['SHARP2'])**2 < 0.075
and (row['CROWD1'] + row['CROWD2']) < 0.1
)
return cond
def format_output(row):
output_columns = ('X', 'Y', 'OBJ', 'COUNTS1', 'BG1', 'ACS1', 'ERR1', 'CHI1',
'SNR1', 'SHARP1', 'ROUND1', 'CROWD1', 'FLAG1', 'COUNTS2',
'BG2', 'ACS2', 'ERR2', 'CHI2', 'SNR2', 'SHARP2', 'ROUND2',
'CROWD2', 'FLAG2')
delimiter = '\t'
return delimiter.join((row[name] for name in output_columns))
def process_data(infilename, outfilename):
column_names = ('EXT', 'CHIP', 'X', 'Y', 'CHI_GL', 'SNR_GL', 'SHARP_GL',
'ROUND_GL', 'MAJAX_GL', 'CROWD_GL', 'OBJ', 'COUNTS1',
'BG1', 'ACS1', 'STD1', 'ERR1', 'CHI1', 'SNR1', 'SHARP1',
'ROUND1', 'CROWD1', 'FWHM1', 'ELLIP1', 'PSFA1', 'PSFB1',
'PSFC1', 'FLAG1', 'COUNTS2', 'BG2', 'ACS2', 'STD2',
'ERR2', 'CHI2', 'SNR2', 'SHARP2', 'ROUND2', 'CROWD2',
'FWHM2', 'ELLIP2', 'PSFA2', 'PSFB2', 'PSFC2', 'FLAG2')
with open(infilename) as infile:
with open(outfilename, 'w') as outfile:
for line in infile:
line = line.strip().split()
row = dict(zip(column_names, line))
if filter_conditions(row.copy()):
outfile.write(format_output(row) + '\n')
if __name__ == '__main__':
main()
我在这里的第一步,就是要去掉 exec()
和 eval()
这两个调用。每次你用 eval
执行一个字符串时,它都需要先编译,然后再执行,这样会增加你每行代码的开销。而且,使用 eval
往往会导致代码变得混乱,难以调试,所以一般来说应该尽量避免。
你可以开始重构代码,把逻辑放进一些小而易懂的函数里。例如,你可以用一个函数来替代 eval(conditions)
,比如:
def conditions(d):
return (d[OBJ] <= 2 and
d[SNRI] > 4 and
d[SNR2] > 4 and
d[FLAG1] < 8 and ...
小贴士:如果你的一些条件更容易出错,可以把它们放在前面,这样 Python 就会跳过后面的评估。
我建议去掉列名的字典,直接在文件的顶部定义一些变量,然后用 line[COLNAME]
来引用列名。这样可能会帮助你简化一些部分,比如条件函数,你可以直接用名字来引用列,而不需要为每个变量单独赋值。
我觉得你可能没提到,但看起来你的数据是以csv格式存储的。你可以试试使用 csv.DictReader,这个工具可以让你一行一行地读取文件(这样就不用一次性把整个文件都加载到内存里),而且你可以通过列的名字来引用数据。
如果你还没看过的话,建议你了解一下 cProfile,这是Python的性能分析工具。它可以告诉你程序中哪些部分花费的时间最多。