生成器是线程安全的吗?
我有一个多线程的程序,我创建了一个生成器函数,然后把它传给新的线程。我希望这个生成器是共享的,也就是说每个线程都能从这个生成器中获取下一个值。
这样使用生成器安全吗?还是说多个线程同时访问这个共享生成器会出现问题?
如果不安全,那有没有更好的解决办法?我需要一种方法,可以循环遍历一个列表,给调用它的线程提供下一个值。
6 个回答
6
不,它们不是线程安全的。你可以在下面的链接找到关于生成器和多线程的一些有趣信息:
55
编辑补充了下面的基准测试。
你可以用锁来包裹一个生成器。例如,
import threading
class LockedIterator(object):
def __init__(self, it):
self.lock = threading.Lock()
self.it = it.__iter__()
def __iter__(self): return self
def next(self):
self.lock.acquire()
try:
return self.it.next()
finally:
self.lock.release()
gen = [x*2 for x in [1,2,3,4]]
g2 = LockedIterator(gen)
print list(g2)
在我的系统上,锁的操作需要50毫秒,而队列的操作需要350毫秒。当你确实需要一个队列时,使用队列是有用的;比如说,当你有进来的HTTP请求时,你想把它们放到队列里,让工作线程去处理。(这不符合Python的迭代器模型——一旦迭代器里的项目用完了,它就结束了。)如果你确实有一个迭代器,那么使用LockedIterator是一种更快、更简单的方法来确保它是线程安全的。
from datetime import datetime
import threading
num_worker_threads = 4
class LockedIterator(object):
def __init__(self, it):
self.lock = threading.Lock()
self.it = it.__iter__()
def __iter__(self): return self
def next(self):
self.lock.acquire()
try:
return self.it.next()
finally:
self.lock.release()
def test_locked(it):
it = LockedIterator(it)
def worker():
try:
for i in it:
pass
except Exception, e:
print e
raise
threads = []
for i in range(num_worker_threads):
t = threading.Thread(target=worker)
threads.append(t)
t.start()
for t in threads:
t.join()
def test_queue(it):
from Queue import Queue
def worker():
try:
while True:
item = q.get()
q.task_done()
except Exception, e:
print e
raise
q = Queue()
for i in range(num_worker_threads):
t = threading.Thread(target=worker)
t.setDaemon(True)
t.start()
t1 = datetime.now()
for item in it:
q.put(item)
q.join()
start_time = datetime.now()
it = [x*2 for x in range(1,10000)]
test_locked(it)
#test_queue(it)
end_time = datetime.now()
took = end_time-start_time
print "took %.01f" % ((took.seconds + took.microseconds/1000000.0)*1000)
63
这个不是线程安全的;如果同时有多个调用,可能会交错进行,导致本地变量出现问题。
常见的做法是使用主从模式(现在在个人电脑上叫做农民-工人模式)。你可以创建一个第三个线程来生成数据,然后在主线程和从线程之间加一个队列,从线程会从这个队列中读取数据,而主线程则往队列里写数据。标准的队列模块提供了必要的线程安全性,并且会让主线程在从线程准备好读取更多数据之前处于阻塞状态。