优化这个Python日志解析代码

6 投票
7 回答
3526 浏览
提问于 2025-04-16 17:04

我在我的笔记本上运行这段代码,处理一个4.2 GB的输入文件,花了48秒。这个输入文件是用制表符分隔的,每个值都用引号包起来。每条记录以换行符结束,比如 '"val1"\t"val2"\t"val3"\t..."valn"\n'

我尝试用10个线程进行多进程处理:一个线程用来排队这些行,8个线程用来解析每一行并填充输出队列,最后一个线程用来把输出队列合并成下面显示的defaultdict,但这段代码运行了300秒,时间是之前的6倍多:

from collections import defaultdict
def get_users(log):
    users = defaultdict(int)
    f = open(log)
    # Read header line
    h = f.readline().strip().replace('"', '').split('\t')
    ix_profile = h.index('profile.type')
    ix_user = h.index('profile.id')
    # If either ix_* is the last field in h, it will include a newline. 
    # That's fine for now.
    for (i, line) in enumerate(f): 
        if i % 1000000 == 0: print "Line %d" % i # progress notification

        l = line.split('\t')
        if l[ix_profile] != '"7"': # "7" indicates a bad value
            # use list slicing to remove quotes
            users[l[ix_user][1:-1]] += 1 

    f.close()
    return users

我检查过自己并没有受到I/O限制,只保留了for循环中的打印语句。那段代码运行了9秒,我认为这是这段代码能达到的最低运行时间。

我有很多这样的5 GB文件需要处理,所以即使是很小的运行时间改进(我知道,我可以去掉打印!)也会有帮助。我运行的机器有4个核心,所以我不禁想,是否有办法让多线程/多进程的代码比上面的代码运行得更快。

更新:

我重新编写了多进程代码,如下所示:

from multiprocessing import Pool, cpu_count
from collections import defaultdict

def parse(line, ix_profile=10, ix_user=9):
    """ix_profile and ix_user predetermined; hard-coding for expedience."""
    l = line.split('\t')
    if l[ix_profile] != '"7"':
        return l[ix_user][1:-1]

def get_users_mp():
    f = open('20110201.txt')
    h = f.readline() # remove header line
    pool = Pool(processes=cpu_count())
    result_iter = pool.imap_unordered(parse, f, 100)
    users = defaultdict(int)
    for r in result_iter:
        if r is not None:
            users[r] += 1
    return users

它运行了26秒,速度提升了1.85倍。还不错,但考虑到有4个核心,效果没有我预期的那么好。

7 个回答

1

因为你的日志文件是用制表符分隔的,所以你可以使用 csv 模块,并加上 dialect='excel-tab' 这个参数,这样可以让你的代码运行得更快,也更容易阅读。当然,这个前提是你必须用Python,而不是那些更快的命令行指令。

2

如果你在使用unix或者cygwin,下面这个小脚本可以帮你计算出用户ID的出现频率,前提是这些用户的个人资料不等于7。这个过程应该很快。

更新了使用awk来统计用户ID的数量

#!/bin/bash

FILENAME="test.txt"

IX_PROFILE=`head -1 ${FILENAME} | sed -e 's/\t/\n/g' | nl -w 1 | grep profile.type | cut -f1`
IX_USER=`head -1 ${FILENAME} | sed -e 's/\t/\n/g' | nl -w 1 | grep profile.id | cut -f1`
# Just the userids
# sed 1d ${FILENAME} | cut -f${IX_PROFILE},${IX_USER} | grep -v \"7\" | cut -f2

# userids counted:
# sed 1d ${FILENAME} | cut -f${IX_PROFILE},${IX_USER} | grep -v \"7\" | cut -f2 | sort | uniq -c

# Count using awk..?
sed 1d ${FILENAME} | cut -f${IX_PROFILE},${IX_USER} | grep -v \"7\" | cut -f2 | awk '{ count[$1]++; } END { for (x in count) { print x "\t" count[x] } }'
4

使用正则表达式。

测试发现,处理过程中的一个耗时部分是调用 str.split()。这可能是因为每一行都需要构建一个列表和一堆字符串对象,这样做很耗费资源。

首先,你需要构建一个正则表达式来匹配这一行。可以像这样:

expression = re.compile(r'("[^"]")\t("[^"]")\t')

如果你调用 expression.match(line).groups(),你会得到前两列提取出来的两个字符串对象,你可以直接对这些进行逻辑处理。

现在,这里假设你关注的两列是第一和第二列。如果不是,你只需要调整正则表达式以匹配正确的列。你的代码会检查表头,以确定列的位置。你可以根据这个生成正则表达式,但我猜这些列的位置其实总是固定的。只需确认它们仍然在那儿,然后在行上使用正则表达式。

编辑

from collections import defaultdict
import re

def get_users(log):
    f = open(log)
    # Read header line
    h = f.readline().strip().replace('\'', '').split('\t')
    ix_profile = h.index('profile.type')
    ix_user = h.index('profile.id')

    assert ix_user < ix_profile

这段代码假设用户在个人资料之前

    keep_field = r'"([^"]*)"'

这个正则表达式将捕获单个列

    skip_field = r'"[^"]*"'

这个正则表达式会匹配列,但不会捕获结果。(注意没有括号)

    fields = [skip_field] * len(h)
    fields[ix_profile] = keep_field
    fields[ix_user] = keep_field

创建一个包含所有字段的列表,只保留我们关心的字段

    del fields[max(ix_profile, ix_user)+1:]

去掉我们不关心的字段(它们匹配时会消耗时间,而我们不需要它们)

    regex = re.compile(r"\t".join(fields))

实际生成正则表达式。

    users = defaultdict(int)
    for line in f:
        user, profile = regex.match(line).groups()

提取出这两个值,然后进行逻辑处理

        if profile != "7": # "7" indicates a bad value
            # use list slicing to remove quotes
            users[user] += 1 

    f.close()
    return users

撰写回答