Python 字符串处理 -- 性能问题
我有一段代码,在我的应用程序中大约执行了200万次,用来解析这么多记录。这个部分似乎是性能瓶颈,我想知道有没有人能帮我,给我一些聪明的技巧,让这些简单的字符串操作变得更快。
try:
data = []
start = 0
end = 0
for info in self.Columns():
end = start + (info.columnLength)
slice = line[start:end]
if slice == '' or len(slice) != info.columnLength:
raise 'Wrong Input'
if info.hasSignage:
if(slice[0:1].strip() != '+' and slice[0:1].strip() != '-'):
raise 'Wrong Input'
if not info.skipColumn:
data.append(slice)
start = end
parsedLine = data
except:
parsedLine = False
6 个回答
关于(使用一些类来提供一个可执行的例子):
class Info(object):
columnLength = 5
hasSignage = True
skipColumn = False
class Something(object):
def Columns(self):
return [Info()]*4
def bottleneck(self):
try:
data = []
start = 0
end = 0
line = '+this-is just a line for testing'
for info in self.Columns():
start = end
collength = info.columnLength
end = start + collength
if info.skipColumn: # start with this
continue
elif collength == 0:
raise ValueError('Wrong Input')
slice = line[start:end] # only now slicing, because it
# is probably most expensive part
if len(slice) != collength:
raise ValueError('Wrong Input')
elif info.hasSignage and slice[0] not in '+-': # bit more compact
raise ValueError('Wrong Input')
else:
data.append(slice)
parsedLine = data
except:
parsedLine = False
Something().bottleneck()
补充说明:
当切片的长度为0时,slice[0]是不存在的,所以必须先检查if collength == 0
。
补充说明2:
你在很多行中使用这段代码,但列的信息是不会变的,对吧?这让你可以:
- 提前计算每一列的起始点(不再需要计算起始和结束位置)
- 提前知道起始和结束位置,.Columns()只需要返回那些没有被跳过且列长度大于0的列(或者你真的需要在每一行都检查长度是否为0吗?)
- 每一行的必需长度是已知的,并且每一行都相等,可以在遍历列信息之前进行检查
补充说明3:
我想知道如果你使用'skipColumn',你怎么知道哪个数据索引属于哪个列呢……
def fubarise(data):
try:
if nasty(data):
raise ValueError("Look, Ma, I'm doing a big fat GOTO ...") # sheesh #1
more_of_the_same()
parsed_line = data
except ValueError:
parsed_line = False
# so it can be a "data" or False -- sheesh #2
return parsed_line
在raise
语句中使用不同的错误信息没有意义,因为这些信息根本不会被看到。真是让人无奈啊 #3。
更新:这里有一个改进的建议,使用struct.unpack
来快速分割输入的行。这个方法也展示了更好的异常处理,假设写代码的人也是在运行这段代码,并且在遇到第一个错误时停止是可以接受的。如果要做一个更健壮的实现,能够记录所有行的所有列的错误,那就另当别论了。通常来说,对每一列的错误检查会更全面,比如检查是否有前导符号,但不检查这一列是否包含有效数字,这样就显得有点奇怪了。
import struct
def unpacked_records(self):
cols = self.Columns()
unpack_fmt = ""
sign_checks = []
start = 0
for colx, info in enumerate(cols, 1):
clen = info.columnLength
if clen < 1:
raise ValueError("Column %d: Bad columnLength %r" % (colx, clen))
if info.skipColumn:
unpack_fmt += str(clen) + "x"
else:
unpack_fmt += str(clen) + "s"
if info.hasSignage:
sign_checks.append(start)
start += clen
expected_len = start
unpack = struct.Struct(unpack_fmt).unpack
for linex, line in enumerate(self.whatever_the_list_of_lines_is, 1):
if len(line) != expected_len:
raise ValueError(
"Line %d: Actual length %d, expected %d"
% (linex, len(line), expected_len))
if not all(line[i] in '+-' for i in sign_checks):
raise ValueError("Line %d: At least one column fails sign check" % linex)
yield unpack(line) # a tuple
编辑:我稍微修改了一下这个回答。原来的回答我会留在下面。
在我之前的回答中,我提到最好能找到一个内置的Python模块来处理解包的工作。我当时想不起来,但也许我应该去谷歌搜索一下。@John Machin 提供了一个答案,展示了如何使用Python的 struct
模块来实现。因为这个模块是用C语言写的,所以它应该比我纯Python的解决方案要快。(不过我并没有实际测量过,所以这只是个猜测。)
我同意原始代码的逻辑有点“不够Python风格”。返回一个哨兵值并不是最好的做法;更好的方式是返回一个有效的值或者抛出一个异常。另一种做法是返回一个有效值的列表,再加上一个无效值的列表。由于@John Machin提供了生成有效值的代码,我想在这里写一个版本,返回两个列表。
注意:也许最好的答案是修改@John Machin的答案,把无效值保存到一个文件中,以便以后查看。他的答案是逐个返回结果,所以不需要构建一个大的解析记录列表;而将错误的行保存到磁盘上,就不需要构建一个可能很大的错误行列表。
import struct
def parse_records(self):
"""
returns a tuple: (good, bad)
good is a list of valid records (as tuples)
bad is a list of tuples: (line_num, line, err)
"""
cols = self.Columns()
unpack_fmt = ""
sign_checks = []
start = 0
for colx, info in enumerate(cols, 1):
clen = info.columnLength
if clen < 1:
raise ValueError("Column %d: Bad columnLength %r" % (colx, clen))
if info.skipColumn:
unpack_fmt += str(clen) + "x"
else:
unpack_fmt += str(clen) + "s"
if info.hasSignage:
sign_checks.append(start)
start += clen
expected_len = start
unpack = struct.Struct(unpack_fmt).unpack
good = []
bad = []
for line_num, line in enumerate(self.whatever_the_list_of_lines_is, 1):
if len(line) != expected_len:
bad.append((line_num, line, "bad length"))
continue
if not all(line[i] in '+-' for i in sign_checks):
bad.append((line_num, line, "sign check failed"))
continue
good.append(unpack(line))
return good, bad
原始回答内容:
如果 self.Columns()
的信息在所有记录中都是相同的,这个回答应该会快很多。我们只处理一次 self.Columns()
的信息,然后构建几个只包含处理记录所需内容的列表。
这段代码展示了如何计算 parsedList
,但并没有实际返回它或者做其他任何事情。显然,你需要对此进行修改。
def parse_records(self):
cols = self.Columns()
slices = []
sign_checks = []
start = 0
for info in cols:
if info.columnLength < 1:
raise ValueError, "bad columnLength"
end = start + info.columnLength
if not info.skipColumn:
tup = (start, end)
slices.append(tup)
if info.hasSignage:
sign_checks.append(start)
expected_len = end # or use (end - 1) to not count a newline
try:
for line in self.whatever_the_list_of_lines_is:
if len(line) != expected_len:
raise ValueError, "wrong length"
if not all(line[i] in '+-' for i in sign_checks):
raise ValueError, "wrong input"
parsedLine = [line[s:e] for s, e in slices]
except ValueError:
parsedLine = False