Python,巨大的迭代性能问题
我正在处理三个单词,每个单词大约有500万个字符。我想找到每个单词中独特的20个字符的序列。也就是说,我想找出在一个单词中长度为20的所有序列,这些序列对于这个单词是独一无二的。我的问题是,我写的代码运行起来非常慢。我甚至从来没有完成过一个单词,程序晚上运行也没用。
下面的这个函数接收一个列表,列表里包含字典,每个字典里有可能的20个字符的单词和它在500万字符长的单词中的位置。
如果有人有优化的想法,我会非常感激,因为我完全不知道该怎么继续下去……
这是我代码的一个示例:
def findUnique(list):
# Takes a list with dictionaries and compairs each element in the dictionaries
# with the others and puts all unique element in new dictionaries and finally
# puts the new dictionaries in a list.
# The result is a list with (in this case) 3 dictionaries containing all unique
# sequences and their locations from each string.
dicList=[]
listlength=len(list)
s=0
valuelist=[]
for i in list:
j=i.values()
valuelist.append(j)
while s<listlength:
currdic=list[s]
dic={}
for key in currdic:
currval=currdic[key]
test=True
n=0
while n<listlength:
if n!=s:
if currval in valuelist[n]: #this is where it takes to much time
n=listlength
test=False
else:
n+=1
else:
n+=1
if test:
dic[key]=currval
dicList.append(dic)
s+=1
return dicList
4 个回答
我们来试着改进一下Roger Pate的精彩回答。
首先,我们可以用集合来代替字典,因为集合本身就能管理唯一性。
其次,由于我们可能会比耗尽CPU时间(和耐心)更快地耗尽内存,所以我们可以为了节省内存而牺牲一些CPU效率。比如,可以尝试只从某个特定字母开始的20个字符。对于DNA来说,这样可以减少75%的需求。
seqlen = 20
maxlength = max([len(word) for word in words])
for startletter in letters:
for letterid in range(maxlength):
for wordid,word in words:
if (letterid < len(word)):
letter = word[letterid]
if letter is startletter:
seq = word[letterid:letterid+seqlen]
if seq in seqtrie and not wordid in seqtrie[seq]:
seqtrie[seq].append(wordid)
或者,如果这仍然占用太多内存,我们可以针对每一个可能的起始对进行处理(对于DNA来说是16次,而不是4次),或者每3个字符处理一次(64次)等等。
你说你有一个长达500万字符的“单词”,但我很难相信这在通常意义上算是一个单词。
如果你能提供更多关于你输入数据的信息,可能会有更具体的解决方案。
比如,英文文本(或者其他任何书写语言)可能会有足够的重复性,这样就可以使用一种叫做字典树的结构。不过在最坏的情况下,它可能会因为构建所有256^20个键而耗尽内存。了解你的输入数据会大有帮助。
编辑
我查看了一些基因组数据,看看这个想法的效果如何,使用了一个硬编码的[acgt]->[0123]映射和每个字典树节点4个子节点。
腺病毒2型: 35,937bp -> 35,899个不同的20碱基序列,使用了469,339个字典树节点。
肠杆菌噬菌体λ: 48,502bp -> 40,921个不同的20碱基序列,使用了529,384个字典树节点。
在这两个数据集中,我没有遇到任何冲突,尽管你的数据可能有更多的冗余和/或重叠。你需要尝试一下才能知道。
如果你确实遇到了一些有用的冲突,可以尝试将三个输入一起处理,构建一个单一的字典树,记录每个叶子的来源,并在过程中修剪掉冲突。
如果你找不到修剪键的方法,可以尝试使用更紧凑的表示方式。例如,你只需要2位来存储[acgt]/[0123],这可能会节省空间,但代码会稍微复杂一些。
不过我觉得你不能单靠蛮力解决这个问题——你需要找到一些方法来缩小问题的规模,而这取决于你的领域知识。
def slices(seq, length, prefer_last=False):
unique = {}
if prefer_last: # this doesn't have to be a parameter, just choose one
for start in xrange(len(seq) - length + 1):
unique[seq[start:start+length]] = start
else: # prefer first
for start in xrange(len(seq) - length, -1, -1):
unique[seq[start:start+length]] = start
return unique
# or find all locations for each slice:
import collections
def slices(seq, length):
unique = collections.defaultdict(list)
for start in xrange(len(seq) - length + 1):
unique[seq[start:start+length]].append(start)
return unique
这个函数(现在在我的 iter_util 模块 中)是 O(n) 的意思是它的运行时间和每个单词的长度成正比。你可以使用 set(slices(..))
(结合一些集合操作,比如差集)来获取所有单词中独特的切片(下面有例子)。如果你不想跟踪位置,也可以把这个函数写成返回一个集合。内存使用会很高(虽然仍然是 O(n),只是一个比较大的系数),如果长度只有 20,可以用一个特殊的 “懒切片”类来减轻这个问题,这个类会存储基础序列(字符串)以及起始和结束位置(或者起始位置和长度)。
打印独特的切片:
a = set(slices("aab", 2)) # {"aa", "ab"}
b = set(slices("abb", 2)) # {"ab", "bb"}
c = set(slices("abc", 2)) # {"ab", "bc"}
all = [a, b, c]
import operator
a_unique = reduce(operator.sub, (x for x in all if x is not a), a)
print a_unique # {"aa"}
包括位置:
a = slices("aab", 2)
b = slices("abb", 2)
c = slices("abc", 2)
all = [a, b, c]
import operator
a_unique = reduce(operator.sub, (set(x) for x in all if x is not a), set(a))
# a_unique is only the keys so far
a_unique = dict((k, a[k]) for k in a_unique)
# now it's a dict of slice -> location(s)
print a_unique # {"aa": 0} or {"aa": [0]}
# (depending on which slices function used)
在一个更接近你条件的测试脚本中,使用随机生成的 500 万字符的单词和 20 的切片长度,内存使用非常高,以至于我的测试脚本很快就达到了 1G 的主内存限制,并开始使用虚拟内存。那时 Python 在 CPU 上几乎没有时间,我不得不终止它。通过减少切片长度或单词长度(因为我使用的是完全随机的单词,这样会减少重复并增加内存使用)来适应主内存,结果在一分钟内就能运行完。这个情况加上你原始代码中的 O(n**2) 会让程序运行得非常慢,这就是为什么算法的时间复杂度和空间复杂度都很重要。
import operator
import random
import string
def slices(seq, length):
unique = {}
for start in xrange(len(seq) - length, -1, -1):
unique[seq[start:start+length]] = start
return unique
def sample_with_repeat(population, length, choice=random.choice):
return "".join(choice(population) for _ in xrange(length))
word_length = 5*1000*1000
words = [sample_with_repeat(string.lowercase, word_length) for _ in xrange(3)]
slice_length = 20
words_slices_sets = [set(slices(x, slice_length)) for x in words]
unique_words_slices = [reduce(operator.sub,
(x for x in words_slices_sets if x is not n),
n)
for n in words_slices_sets]
print [len(x) for x in unique_words_slices]