在Python中合并惰性流(使用生成器)

2 投票
2 回答
2661 浏览
提问于 2025-04-17 14:26

我在玩Python 3的函数功能,尝试实现一个经典算法来计算哈明数。哈明数就是那些只包含2、3或5作为质因数的数字。最初的哈明数有2、3、4、5、6、8、10、12、15、16、18、20等等。

我的实现代码如下:

def scale(s, m):
    return (x*m for x in s)

def merge(s1, s2):
    it1, it2 = iter(s1), iter(s2)
    x1, x2 = next(it1), next(it2)
    if x1 < x2:
        x = x1
        it = iter(merge(it1, s2))
    elif x1 > x2:
        x = x2
        it = iter(merge(s1, it2))
    else:
        x = x1
        it = iter(merge(it1, it2))
    yield x
    while True: yield next(it)

def integers():
    n = 0
    while True:
        n += 1
        yield n

m2 = scale(integers(), 2)
m3 = scale(integers(), 3)
m5 = scale(integers(), 5)

m23 = merge(m2, m3)

hamming_numbers = merge(m23, m5)

问题是合并操作似乎根本不管用。在此之前,我用同样的方式实现了埃拉托斯特尼筛法,它运行得非常好:

def sieve(s):
    it = iter(s)
    x = next(it)
    yield x
    it = iter(sieve(filter(lambda y: x % y, it)))
    while True: yield next(it)

这个算法使用了和我的合并操作相同的技术。所以我看不出有什么不同。你们有什么想法吗?

(我知道这些算法可以用其他方法实现,但我的目标是理解Python的生成器和纯函数能力,包括递归,而不使用类声明或特殊的内置Python函数。)

更新:对于Will Ness,这是我在LISP(实际上是Racket)中实现的这个算法:

(define (scale str m)
  (stream-map (lambda (x) (* x m)) str))

(define (integers-from n)
  (stream-cons n
               (integers-from (+ n 1))))

(define (merge s1 s2)
  (let ((x1 (stream-first s1))
        (x2 (stream-first s2)))
    (cond ((< x1 x2)
           (stream-cons x1 (merge (stream-rest s1) s2)))
          ((> x1 x2)
           (stream-cons x2 (merge s1 (stream-rest s2))))
          (else
           (stream-cons x1 (merge (stream-rest s1) (stream-rest s2)))))))


(define integers (integers-from 1))

(define hamming-numbers
  (stream-cons 1 (merge (scale hamming-numbers 2)
                        (merge (scale hamming-numbers 3)
                               (scale hamming-numbers 5)))))

2 个回答

1
from heapq import heappush, heappop

def hamming_numbers(n):
    ans = [1]

    last = 0
    count = 0

    while count < n:
        x = heappop(ans)

        if x > last:
            yield x

            last = x
            count += 1

            heappush(ans, 2* x)
            heappush(ans, 3* x)
            heappush(ans, 5* x)
>>> n  = 20
>>> print(list(hamming_numbers(20)))
   [1, 2, 3, 4, 5, 6, 8, 9, 10, 12, 15, 16, 18, 20, 24, 25, 27, 30, 32, 36]

我想提出一个不同的方法 - 使用Python中的heapq(也叫min-heapq)配合生成器(也就是懒惰计算),前提是你不一定要保留merge()这个函数。

3

你的算法有问题。你的 m2, m3, m5 应该是用来调整 hamming_numbers,而不是 integers

主要的问题在于:你的 merge() 函数对两个参数都不加条件地调用了 next(),所以两个参数都会向前移动一步。比如说,当它生成第一个数字,比如 2(来自 m23 生成器)时,下一次调用时,第一个参数会变成 4(,6,8,...),而第二个参数会变成 6(,9,12,...)。这样 3 就已经被跳过了。所以它总是取出两个参数,并且总是返回第一个参数的头部(可以参考这个链接:http://ideone.com/doeX2Q)。

调用 iter() 是多余的,这里没有任何帮助。当我把它去掉后(http://ideone.com/7tk85h),程序的运行效果和输出结果完全一样(依然是错误的)。通常情况下,iter() 是用来创建一个懒加载的迭代器对象,但这里的参数已经是生成器了。

在你的 sieve() 函数中也没有必要调用 iter()http://ideone.com/kYh7Di)。sieve() 已经定义了一个生成器,而 Python 3 中的 filter() 函数可以从一个函数和一个可迭代对象(生成器本身就是可迭代的)创建一个迭代器。你可以参考这个链接了解更多:Python 的生成器和迭代器的区别

我们可以这样来合并:

def merge(s1, s2):
  x1, x2 = next(s1), next(s2)
  while True:
    if x1 < x2:
        yield x1
        x1 = next(s1)
    elif x1 > x2:
        yield x2
        x2 = next(s2)
    else:
        yield x1
        x1, x2 = next(s1), next(s2)

实际上,在定义 sieve() 函数时,递归并不是必需的。事实上,它只是掩盖了代码中的一个巨大缺陷。它生成的任何素数都会被所有小于它的素数进行测试,但实际上只需要小于它平方根的素数。我们可以很容易地用非递归的方式来修复这个问题(http://ideone.com/Qaycpe):

def sieve(s):    # call as: sieve( integers_from(2))
    x = next(s)  
    yield x
    ps = sieve( integers_from(2))           # independent primes supply
    p = next(ps) 
    q = p*p       ; print((p,q))
    while True:
        x = next(s)
        while x<q: 
            yield x
            x = next(s)
        # here x == q
        s = filter(lambda y,p=p: y % p, s)  # filter creation postponed 
        p = next(ps)                        #   until square of p seen in input
        q = p*p 

(*)(其实,这也是递归的,但方式和之前很不一样)

现在这样效率高多了(还可以参考:解释这段输出素数流的 Haskell 代码)。

无论是递归还是非递归,只是代码的一种语法特征。实际运行时的结构是相同的——filter() 适配器被放在输入流的上方——要么在合适的时机,要么太早(这样我们会得到太多的适配器)。

撰写回答