如何在Python中从gzip压缩文件中获取随机行而不将其读入内存

5 投票
3 回答
2247 浏览
提问于 2025-04-17 13:08

假设我有一个531GB的压缩文本文件,里面有512,548,457,601,475行内容,行与行之间用'\n'分隔。我想从中随机抽取一行,而不想把文件拆分成多个小文件。(别担心,这个文件其实并没有那么大;我只是想说明它很庞大,而且我知道它有多少行。)

如果是处理一个较小的压缩文件,我通常会这样做:

import fileinput
import gzip
import random

list = []

for line in fileinput.input(file, openhook=gzip.open):
    list.append(line)

listLength = len(list)
randomListLineOne = line[random.randint(0, listLength)]
randomListLineTwo = line[random.randint(0, listLength)]
...

关于这个话题,我找到了一些资料:

如何在Python中从一个文件中读取随机行?

import random

def random_line(afile):
    line = next(afile)
    for num, aline in enumerate(afile):
      if random.randrange(num + 2): continue
      line = aline
    return line

Waterman的“水库算法”,由Alex Martelli翻译自Knuth的《计算机程序设计艺术》

你能把这个算法调整一下,让它适用于压缩文件吗?我尝试把我的压缩文件设为afile,但那样不行。或者有没有其他(更简单)的方法可以实现这个目标?

3 个回答

0

抱歉这么晚才回答,但如果你知道文件的大小,可以使用 seek() 方法来定位文件中的位置,这个大小可以通过 gunzip -l 命令得到。
然后,丢掉下一次读取的数据,因为那可能只是一部分行,接下来的读取才是你需要的随机数据。

从一个压缩的文本文件中打印10行随机内容。

import random
import gzip, os
f = gzip.open("some.txt.gz","r")
unc_size = os.popen('gunzip -lq some.txt.gz').read()
unc_size = unc_size.strip().split(" ",1)
unc_size = unc_size[1].strip().split(" ",1)
for x in range(1,11):
    f.seek(random.randint(0,int(unc_size[0])))
    dump = next(f)
    print "Random line from byte pos ",f.tell(), next(f)
f.close() 
2

你可以简单地使用“从一个文件中随机读取一行”的方法,但要用 gzip 这个包来把文件当作gzip文件打开,而不是普通文件。

import gzip
import random

def random_line(afile):
    line = next(afile)
    for num, aline in enumerate(afile):
        if random.randrange(num + 2): continue
        line = aline
    return line

afile = gzip.open("myfile.zip")
print random_line(afile)
afile.close()
5

蒙特卡罗方法

作为一种替代方案,可以不按行读取文件,具体方法可以参考逐行读取文件的方法*

(*可以使用David Robinson的方法将gzip文件当作标准文件来读取):

如果文件中的每一行长度差不多,你可以随机跳到文件中的某个位置,然后一个字符一个字符地往回找,直到找到换行符,然后从那个位置读取整行。如果每一行的长度完全相同,这种方法是准确的。

但如果每一行的长度不一样,而你知道某种分布,比如某一行的长度是x,那么你可以用上面的方法,但要根据概率P(x)来拒绝那些过于频繁的x,这样才能确保从文件中随机抓取一行的概率是恒定的。

举个例子:

为了简单起见,假设你有一个5行的文件,行长度为X={2,3,5,5,5}。随机选择一个点,你有10%的机会得到x1(2/(2+3+5+5+5)),15%的机会得到x2,50%的机会得到x3。你想要的概率分别是20%/20%/60%。相应的权重是W=(3/2, 1, 6/5),这些数字使得x1*w1 = 20%x2*w2 = 20%x3*w3=60%。归一化因子是这些权重的总和Z = w1+w2+w3 = 37/10。这样我们就知道每一行的概率了:

 P(w1) = w1/Z = 30/68
 P(w2) = w2/Z = 20/68
 P(w3) = w3/Z = 18/68

注意P(w1)+P(w2)+3*P(w3)=1,这应该是成立的。

在你的算法中,选择文件中的一个随机点。如果对应的行长度是2,就在q=[0,1]之间随机选择一个数字。如果q>(30/68),就拒绝这个点,重新尝试。如果小于这个值,就停止并返回那一行。

什么时候你知道X(w)

我承认,知道每一行长度的确切分布可能会显得有些限制,但有很多程序生成的文件(比如日志文件、硬件数据读取等)是有确切分布的。此外,如果只大致知道分布,我们也可以用上面的方法来确定样本拒绝标准,作为最佳猜测,然后再继续。

蒙特卡罗方法?

这可能不是最好的方法(谁能和Knuth竞争呢?),但它可能会提供一些全新视角来解决问题。对于不熟悉的人来说,上面的方法是一种重要性抽样,属于蒙特卡罗方法

如何在gzip文件中查找?

根据提问者的要求,这里是关于如何在Python文件对象中使用seek的入门介绍。

import gzip, random

# Helper function to create some test data
def line(char,n): 
    return ''.join([("%s"%char)*n,"\n"])

# Create the test data as in the example
filename = "test.zip"
FOUT = gzip.open(filename,'wb')
FOUT.write(line('a',2))
FOUT.write(line('b',3))
FOUT.write(line('c',5))
FOUT.write(line('d',5))
FOUT.write(line('e',5))
FOUT.close()

# Since we know the distribution, we know the length
length = 2+3+3*5+5 # 5 newlines

# Print 7 random points in the file
FIN = gzip.open(filename,'rb')
for n in xrange(7):
    FIN.seek(random.randrange(length),0)
    print "Position %3i, char: %s" %(FIN.tell(), [FIN.read(1)])

这是一个示例运行的输出:

Position   8, char: ['c']
Position  23, char: ['e']
Position  15, char: ['d']
Position  10, char: ['c']
Position   4, char: ['b']
Position  16, char: ['d']
Position   2, char: ['\n']

撰写回答