滥用yield以避免循环中的条件
我需要在一些内容中查找某个东西的第一次、最后一次、任意一次或所有出现的情况。为了避免重复自己(DRY),我想出了以下解决方案。
这里主要关注的是两个 Searcher
类中的方法 search_revisions()
和 collect_one_occurence()
。
在 SearcherYield
中,我在 search_revisions()
方法里创建了一个生成器,但在 collect_one_occurence()
中却在收集到第一个结果后就放弃了这个生成器。而在 SearcherCondition
中,我在循环里加了一个条件,这个条件需要在每次循环时都检查。
我不知道我这种使用生成器然后又放弃的做法是聪明的创意,还是一个糟糕的黑科技。你觉得怎么样?你有没有其他的想法来处理这种情况?
#!/usr/bin/python
class Revision:
# a revision is something like a textfile.
# the search() method will search the textfile
# and return the lines which match the given pattern.
# for demonstration purposes this class is simplified
# to return predefined results
def __init__(self, results):
self.results = results
def search(self, pattern):
return self.results
class AbstractSearcher:
def __init__(self, revisions):
self.revisions = revisions
def search_for_first_occurence(self, pattern):
keys = sorted(self.revisions.iterkeys())
return self.collect_one_occurence(keys, pattern)
def search_for_last_occurence(self, pattern):
keys = sorted(self.revisions.iterkeys(), reverse = True)
return self.collect_one_occurence(keys, pattern)
def search_for_any_occurence(self, pattern):
keys = self.revisions.iterkeys()
return self.collect_one_occurence(keys, pattern)
def search_for_all_occurences(self, pattern):
keys = self.revisions.iterkeys()
return self.collect_all_occurences(keys, pattern)
class SearcherYield(AbstractSearcher):
def search_revisions(self, keys, pattern):
# create generator which yields the results one by one
for key in keys:
rev = self.revisions[key]
result = rev.search(pattern)
if result:
yield result
def collect_one_occurence(self, keys, pattern):
# take the first result and then abandon the generator
for result in self.search_revisions(keys, pattern):
return result
return []
def collect_all_occurences(self, keys, pattern):
# collect all results from generator
results = []
for result in self.search_revisions(keys, pattern):
results.extend(result)
return results
class SearcherCondition(AbstractSearcher):
def search_revisions(self, keys, pattern, just_one):
# collect either all results from all revisions
# or break the loop after first result found
results = []
for key in keys:
rev = self.revisions[key]
result = rev.search(pattern)
if result:
results.extend(result)
if just_one:
break
return results
def collect_one_occurence(self, keys, pattern):
return self.search_revisions(keys, pattern, just_one = True)
def collect_all_occurences(self, keys, pattern):
return self.search_revisions(keys, pattern, just_one = False)
def demo(searcher):
print searcher.__class__.__name__
print 'first:', searcher.search_for_first_occurence('foo')
print 'last: ', searcher.search_for_last_occurence('foo')
print 'any: ', searcher.search_for_any_occurence('foo')
print 'all: ', searcher.search_for_all_occurences('foo')
def main():
revisions = {
1: Revision([]),
2: Revision(['a', 'b']),
3: Revision(['c']),
4: Revision(['d','e', 'f']),
5: Revision([])}
demo(SearcherYield(revisions))
demo(SearcherCondition(revisions))
if __name__ == '__main__':
main()
一些背景信息:修订版基本上就是文本文件。你可以把它们想象成一个维基页面的修订记录。通常会有数百个修订版,有时甚至上千个。每个修订版里可能有成千上万行的文本。有时也会有只有几行的修订版。
在一个修订版中搜索时,会在文本中查找某个模式,并返回匹配的行。有时会有成千上万的结果,有时则没有结果。
有时我只需要知道在任何修订版中是否有结果(查找任意)。有时我需要收集所有结果以便进一步处理(查找所有)。有时我只需要第一个匹配的修订版,有时只需要最后一个匹配的修订版(查找第一个和最后一个)。
3 个回答
我个人觉得使用yield会让代码更容易读懂,不过这也不是绝对的。我没有特别的理由,只是觉得它是个很不错的代码结构,适用于很多情况。
你可能已经知道,代码需要把匹配的修订版本返回给调用者。最简单的修改方式就是在修订搜索方法返回结果时,返回一个指向修订的链接。
你可以考虑利用Python的itertools模块和yield结合使用,这样可以减少代码量。虽然可读性可能会下降,但效果真的很酷:
from itertools import chain,repeat,islice,ifilter
def collect_one_occurence(self, keys, pattern):
return chain(ifilter(None,(rev.search(pattern) for rev in (self.revisions[key] for key in keys)),repeat([]).next()
def collect_all_occurences(self, keys, pattern):
return list(chain(*[rev.search(pattern) for rev in (self.revisions[key] for key in keys)]))
当然,你可以扩展代码让它更易读,但我为了测试性能把它保持简单。你觉得这样会对你现在的结果有多大改善呢?
如果你的查找项是不可改变的,而 collec 是任何一种有序的集合,这段代码可以解决你的问题:
positions = [i for i, item in enumerate(collec) if item==lookup_item]
这段代码实际上会返回查找项在 collec 中出现的所有位置。
我做了一个性能测试。以下是结果:
$ ./benchmark.py
benchmark with revcount: 1000 timeitcount: 1000
last, first, yield: 0.902059793472
last, first, cond: 0.897155046463
last, all, yield: 0.818709135056
last, all, cond: 0.818334102631
all, all, yield: 1.26602506638
all, all, cond: 1.17208003998
benchmark with revcount: 2000 timeitcount: 1000
last, first, yield: 1.80768609047
last, first, cond: 1.84234118462
last, all, yield: 1.64661192894
last, all, cond: 1.67588806152
all, all, yield: 2.55621600151
all, all, cond: 2.37582707405
benchmark with revcount: 10000 timeitcount: 1000
last, first, yield: 9.34304785728
last, first, cond: 9.33725094795
last, all, yield: 8.4673140049
last, all, cond: 8.49153590202
all, all, yield: 12.9636368752
all, all, cond: 11.780673027
使用“yield”(生成器)和条件解决方案的时间非常相似。我觉得这是因为生成器里面有一个循环,并且这个循环里有个条件判断(比如说检查是否为空之类的)。我原以为我把条件判断移出了循环,但其实只是把它藏起来了。
总之,数据显示这两种方法的性能差不多,所以代码的好坏应该看可读性。我会选择在循环里加条件判断。我喜欢明确的写法。
这是我的性能测试代码:
#!/usr/bin/python
import functools
import timeit
class Revision:
# a revision is something like a textfile.
# the search() method will search the textfile
# and return the lines which match the given pattern.
# for demonstration purposes this class is simplified
# to return predefined results
def __init__(self, results):
self.results = results
def search(self, pattern):
return self.results
class AbstractSearcher:
def __init__(self, revisions):
self.revisions = revisions
def search_for_first_occurence(self, pattern):
keys = sorted(self.revisions.iterkeys())
return self.collect_one_occurence(keys, pattern)
def search_for_last_occurence(self, pattern):
keys = sorted(self.revisions.iterkeys(), reverse = True)
return self.collect_one_occurence(keys, pattern)
def search_for_any_occurence(self, pattern):
keys = self.revisions.iterkeys()
return self.collect_one_occurence(keys, pattern)
def search_for_all_occurences(self, pattern):
keys = self.revisions.iterkeys()
return self.collect_all_occurences(keys, pattern)
class SearcherYield(AbstractSearcher):
def search_revisions(self, keys, pattern):
# create generator which yields the results one by one
for key in keys:
rev = self.revisions[key]
result = rev.search(pattern)
if result:
yield result
def collect_one_occurence(self, keys, pattern):
# take the first result and then abandon the generator
for result in self.search_revisions(keys, pattern):
return result
return []
def collect_all_occurences(self, keys, pattern):
# collect all results from generator
results = []
for result in self.search_revisions(keys, pattern):
results.extend(result)
return results
class SearcherCondition(AbstractSearcher):
def search_revisions(self, keys, pattern, just_one):
# collect either all results from all revisions
# or break the loop after first result found
results = []
for key in keys:
rev = self.revisions[key]
result = rev.search(pattern)
if result:
results.extend(result)
if just_one:
break
return results
def collect_one_occurence(self, keys, pattern):
return self.search_revisions(keys, pattern, just_one = True)
def collect_all_occurences(self, keys, pattern):
return self.search_revisions(keys, pattern, just_one = False)
def benchmark(revcount, timeitcount):
lastrev = {}
for i in range(revcount):
lastrev[i] = Revision([])
lastrev[revcount] = Revision([1])
allrevs = {}
for i in range(revcount):
allrevs[i] = Revision([1])
last_yield = SearcherYield(lastrev)
last_cond = SearcherCondition(lastrev)
all_yield = SearcherYield(allrevs)
all_cond = SearcherCondition(allrevs)
lfy = functools.partial(last_yield.search_for_first_occurence, 'foo')
lfc = functools.partial(last_cond.search_for_first_occurence, 'foo')
lay = functools.partial(last_yield.search_for_all_occurences, 'foo')
lac = functools.partial(last_cond.search_for_all_occurences, 'foo')
aay = functools.partial(all_yield.search_for_all_occurences, 'foo')
aac = functools.partial(all_cond.search_for_all_occurences, 'foo')
print 'benchmark with revcount: %d timeitcount: %d' % (revcount, timeitcount)
print 'last, first, yield:', timeit.timeit(lfy, number = timeitcount)
print 'last, first, cond:', timeit.timeit(lfc, number = timeitcount)
print 'last, all, yield:', timeit.timeit(lay, number = timeitcount)
print 'last, all, cond:', timeit.timeit(lac, number = timeitcount)
print ' all, all, yield:', timeit.timeit(aay, number = timeitcount)
print ' all, all, cond:', timeit.timeit(aac, number = timeitcount)
def main():
timeitcount = 1000
benchmark(1000, timeitcount)
benchmark(2000, timeitcount)
benchmark(10000, timeitcount)
if __name__ == '__main__':
main()
关于我的系统的一些信息:
$ lsb_release -a
No LSB modules are available.
Distributor ID: Ubuntu
Description: Ubuntu 10.04.1 LTS
Release: 10.04
Codename: lucid
$ uname -a
Linux lesmana-laptop 2.6.32-26-generic #46-Ubuntu SMP Tue Oct 26 16:46:46 UTC 2010 i686 GNU/Linux
$ python --version
Python 2.6.5
$ cat /proc/cpuinfo | grep name
model name : Intel(R) Pentium(R) M processor 1.60GHz