生成器是线程安全的吗?

50 投票
6 回答
20779 浏览
提问于 2025-04-15 12:55

我有一个多线程的程序,我创建了一个生成器函数,然后把它传给新的线程。我希望这个生成器是共享的,也就是说每个线程都能从这个生成器中获取下一个值。

这样使用生成器安全吗?还是说多个线程同时访问这个共享生成器会出现问题?

如果不安全,那有没有更好的解决办法?我需要一种方法,可以循环遍历一个列表,给调用它的线程提供下一个值。

6 个回答

6

不,它们不是线程安全的。你可以在下面的链接找到关于生成器和多线程的一些有趣信息:

http://www.dabeaz.com/generators/Generators.pdf

55

编辑补充了下面的基准测试。

你可以用锁来包裹一个生成器。例如,

import threading
class LockedIterator(object):
    def __init__(self, it):
        self.lock = threading.Lock()
        self.it = it.__iter__()

    def __iter__(self): return self

    def next(self):
        self.lock.acquire()
        try:
            return self.it.next()
        finally:
            self.lock.release()

gen = [x*2 for x in [1,2,3,4]]
g2 = LockedIterator(gen)
print list(g2)

在我的系统上,锁的操作需要50毫秒,而队列的操作需要350毫秒。当你确实需要一个队列时,使用队列是有用的;比如说,当你有进来的HTTP请求时,你想把它们放到队列里,让工作线程去处理。(这不符合Python的迭代器模型——一旦迭代器里的项目用完了,它就结束了。)如果你确实有一个迭代器,那么使用LockedIterator是一种更快、更简单的方法来确保它是线程安全的。

from datetime import datetime
import threading
num_worker_threads = 4

class LockedIterator(object):
    def __init__(self, it):
        self.lock = threading.Lock()
        self.it = it.__iter__()

    def __iter__(self): return self

    def next(self):
        self.lock.acquire()
        try:
            return self.it.next()
        finally:
            self.lock.release()

def test_locked(it):
    it = LockedIterator(it)
    def worker():
        try:
            for i in it:
                pass
        except Exception, e:
            print e
            raise

    threads = []
    for i in range(num_worker_threads):
        t = threading.Thread(target=worker)
        threads.append(t)
        t.start()

    for t in threads:
        t.join()

def test_queue(it):
    from Queue import Queue
    def worker():
        try:
            while True:
                item = q.get()
                q.task_done()
        except Exception, e:
            print e
            raise

    q = Queue()
    for i in range(num_worker_threads):
         t = threading.Thread(target=worker)
         t.setDaemon(True)
         t.start()

    t1 = datetime.now()

    for item in it:
        q.put(item)

    q.join()

start_time = datetime.now()
it = [x*2 for x in range(1,10000)]

test_locked(it)
#test_queue(it)
end_time = datetime.now()
took = end_time-start_time
print "took %.01f" % ((took.seconds + took.microseconds/1000000.0)*1000)
63

这个不是线程安全的;如果同时有多个调用,可能会交错进行,导致本地变量出现问题。

常见的做法是使用主从模式(现在在个人电脑上叫做农民-工人模式)。你可以创建一个第三个线程来生成数据,然后在主线程和从线程之间加一个队列,从线程会从这个队列中读取数据,而主线程则往队列里写数据。标准的队列模块提供了必要的线程安全性,并且会让主线程在从线程准备好读取更多数据之前处于阻塞状态。

撰写回答