如何提升Python中readline循环的速度?

2 投票
5 回答
3243 浏览
提问于 2025-04-15 14:16

我正在把一个文本格式的数据库备份导入到MySQL中,问题是,在我需要的数据之前,有很多不重要的内容。

我写了一个循环来找到需要的数据:

def readloop(DBFILE):
    txtdb=open(DBFILE, 'r')

sline = ""

# loop till 1st "customernum:" is found
while sline.startswith("customernum:  ") is False: 
    sline = txtdb.readline()

while sline.startswith("customernum:  "):
    data = []
    data.append(sline)
    sline = txtdb.readline()
    while sline.startswith("customernum:  ") is False:
        data.append(sline)
        sline = txtdb.readline()
        if len(sline) == 0:
            break
    customernum = getitem(data, "customernum:  ")
    street = getitem(data, "street:  ")
    country = getitem(data, "country:  ")
    zip = getitem(data, "zip:  ")

这个文本文件非常大,所以光是循环到第一个想要的条目就花了很多时间。有没有人知道有没有更快的方法(或者我现在的做法是不是最好的选择)?

非常感谢大家!

5 个回答

1

我猜你在写这个导入脚本的时候,测试时总是要等,感觉很无聊,所以数据一直保持不变。

你可以先运行一次脚本,使用 print txtdb.tell() 来找出你想跳转到的文件位置。把这些位置记下来,然后把查找代码换成 txtdb.seek( pos )。简单来说,这就是在为文件建立一个索引;-)

另一种更常见的方法是一次读取更大块的数据,比如几兆字节,而不是每次只读取一行的几个字节。

5

请不要写这样的代码:

while condition is False:

布尔条件就是布尔值,所以可以直接测试(或者取反后测试):

while not condition:

你的第二个 while 循环没有写成 "while condition is True:",我很好奇你为什么在第一个循环中需要测试 "is False"。

拿出 dis 模块,我想再深入分析一下。在我使用 pyparsing 的经验中,函数调用会严重影响性能,所以如果能避免函数调用,那就最好了。下面是你原来的测试:

>>> test = lambda t : t.startswith('customernum') is False
>>> dis.dis(test)
  1           0 LOAD_FAST                0 (t)
              3 LOAD_ATTR                0 (startswith)
              6 LOAD_CONST               0 ('customernum')
              9 CALL_FUNCTION            1
             12 LOAD_GLOBAL              1 (False)
             15 COMPARE_OP               8 (is)
             18 RETURN_VALUE

这里发生了两件耗费性能的事情,CALL_FUNCTIONLOAD_GLOBAL。你可以通过为 False 定义一个局部变量来减少 LOAD_GLOBAL 的使用:

>>> test = lambda t,False=False : t.startswith('customernum') is False
>>> dis.dis(test)
  1           0 LOAD_FAST                0 (t)
              3 LOAD_ATTR                0 (startswith)
              6 LOAD_CONST               0 ('customernum')
              9 CALL_FUNCTION            1
             12 LOAD_FAST                1 (False)
             15 COMPARE_OP               8 (is)
             18 RETURN_VALUE

但是如果我们完全去掉 'is' 测试呢?

>>> test = lambda t : not t.startswith('customernum')
>>> dis.dis(test)
  1           0 LOAD_FAST                0 (t)
              3 LOAD_ATTR                0 (startswith)
              6 LOAD_CONST               0 ('customernum')
              9 CALL_FUNCTION            1
             12 UNARY_NOT
             13 RETURN_VALUE

我们用一个简单的 UNARY_NOT 替代了 LOAD_xxxCOMPARE_OP。 "is False" 对性能的帮助并不大。

那么如果我们能在不调用任何函数的情况下,粗略地消除一行代码呢?如果这一行的第一个字符不是 'c',那么它就不可能以 'customernum' 开头。我们试试这样:

>>> test = lambda t : t[0] != 'c' and not t.startswith('customernum')
>>> dis.dis(test)
  1           0 LOAD_FAST                0 (t)
              3 LOAD_CONST               0 (0)
              6 BINARY_SUBSCR
              7 LOAD_CONST               1 ('c')
             10 COMPARE_OP               3 (!=)
             13 JUMP_IF_FALSE           14 (to 30)
             16 POP_TOP
             17 LOAD_FAST                0 (t)
             20 LOAD_ATTR                0 (startswith)
             23 LOAD_CONST               2 ('customernum')
             26 CALL_FUNCTION            1
             29 UNARY_NOT
        >>   30 RETURN_VALUE

(注意,使用 [0] 来获取字符串的第一个字符并不会创建切片——这实际上是非常快的。)

现在,假设以 'c' 开头的行不是很多,这个粗略的过滤器可以用所有相对快速的指令来消除一行。实际上,通过测试 "t[0] != 'c'" 而不是 "not t[0] == 'c'",我们省去了一个多余的 UNARY_NOT 指令。

所以根据这个关于优化的学习,我建议把这段代码改成:

while sline.startswith("customernum:  ") is False:
    sline = txtdb.readline()

while sline.startswith("customernum:  "):
    ... do the rest of the customer data stuff...

变成这样:

for sline in txtdb:
    if sline[0] == 'c' and \ 
       sline.startswith("customernum:  "):
        ... do the rest of the customer data stuff...

注意,我也去掉了 .readline() 的函数调用,而是直接用 "for sline in txtdb" 来遍历文件。

我知道 Alex 提供了一个完全不同的代码来找到第一个 'customernum' 行,但我建议在你的算法的基本框架内进行优化,而不是直接使用那些复杂的块读取方法。

2

优化的基本思路是“按大块来处理”,也就是说大部分时间不去关注每一行的具体结构,先找到我们感兴趣的第一行,然后再逐行处理后面的内容。这种方法有点麻烦,容易出错(比如说多了一行或少了一行),所以需要好好测试一下。不过大致的思路就是这样……:

import itertools

def readloop(DBFILE):
  txtdb=open(DBFILE, 'r')
  tag = "customernum:  "
  BIGBLOCK = 1024 * 1024
  # locate first occurrence of tag at line-start
  # (assumes the VERY FIRST line doesn't start that way,
  # else you need a special-case and slight refactoring)
  blob = ''
  while True:
    blob = blob + txtdb.read(BIGBLOCK)
    if not blob:
      # tag not present at all -- warn about that, then
      return
    where = blob.find('\n' + tag)
    if where != -1:  # found it!
      blob = blob[where+1:] + txtdb.readline()
      break
    blob = blob[-len(tag):]
  # now make a by-line iterator over the part of interest
  thelines = itertools.chain(blob.splitlines(1), txtdb)
  sline = next(thelines, '')
  while sline.startswith(tag):
    data = []
    data.append(sline)
    sline = next(thelines, '')
    while not sline.startswith(tag):
      data.append(sline)
      sline = next(thelines, '')
      if not sline:
        break
    customernum = getitem(data, "customernum:  ")
    street = getitem(data, "street:  ")
    country = getitem(data, "country:  ")
    zip = getitem(data, "zip:  ")

在这里,我尽量保持你原有的结构,只做了一些小的改进,主要是围绕这个优化的“大方向”进行的调整。

撰写回答