itertools.tee()的结果是线程安全的吗?
假设我有这段Python代码:
from itertools import count, tee
original = count() # just an example, can be another iterable
a, b = tee(original)
我的问题是,如果我在一个线程中开始遍历"a",同时在另一个线程中遍历"b",会不会出现什么问题?很明显,a和b共享了一些数据(比如原始的可迭代对象,还有一些额外的东西,内部缓存之类的)。那么,当a.next()和b.next()访问这些共享数据时,它们会适当地进行锁定吗?
5 个回答
如果文档中显示的等效代码是正确的,链接在这里:
那么答案是否定的,它不会是线程安全的。
需要注意的是,虽然deque被说明是线程安全的,可以安全地添加和删除元素,但这并不意味着使用它的代码也是安全的。
因为主代码可能会在多个线程中请求底层迭代器的元素,所以为了让tee安全,你需要确保输入的集合和迭代器都是线程安全的。
更新! 在较新的 Python 2.7、3.7、3.8 及以上版本中,tee 引起的段错误问题已经修复。不过,你仍然需要自己管理并发访问,以确保线程安全。你可以参考我下面的解决方案。
简而言之
在 CPython 中,`itertools.tee` 只有在原始迭代器是用 C/C++ 实现的情况下才是线程安全的,也就是说,不能使用 **任何** Python 代码。如果原始迭代器 it
是用 Python 编写的,比如类实例或生成器,那么 itertools.tee(it)
就不是线程安全的。在最好的情况下,你只会遇到一个异常(你肯定会遇到),在最坏的情况下,Python 会崩溃。
与其使用 tee
,不如使用下面这个线程安全的包装类和函数:
class safeteeobject(object):
"""tee object wrapped to make it thread-safe"""
def __init__(self, teeobj, lock):
self.teeobj = teeobj
self.lock = lock
def __iter__(self):
return self
def __next__(self):
with self.lock:
return next(self.teeobj)
def __copy__(self):
return safeteeobject(self.teeobj.__copy__(), self.lock)
def safetee(iterable, n=2):
"""tuple of n independent thread-safe iterators"""
lock = Lock()
return tuple(safeteeobject(teeobj, lock) for teeobj in tee(iterable, n))
接下来我会详细讲解 tee
什么时候是线程安全的,什么时候不是,以及原因。
可以正常工作的例子
我们来运行一些代码(这是 Python 3 的代码,如果是 Python 2,请用 `itertools.izip` 替代 `zip` 以获得相同的行为):>>> from itertools import tee, count
>>> from threading import Thread
>>> def limited_sum(it):
... s = 0
... for elem, _ in zip(it, range(1000000)):
... s += elem
... print(elem)
>>> a, b = tee(count())
>>> [Thread(target=limited_sum, args=(it,)).start() for it in [a, b]]
# prints 499999500000 twice, which is in fact the same 1+...+999999
itertools.count 完全是用 C 编写的,位于 CPython 项目的 Modules/itertoolsmodule.c
文件中,所以它工作得很好。
列表、元组、集合、范围、字典(键、值和项)、collections.defaultdict
(键、值和项)等也是如此。
不工作示例 - 生成器
一个非常简单的例子是使用生成器:>>> gen = (i for i in range(1000000))
>>> a, b = tee(gen)
>>> [Thread(target=sum, args=(it,)).start() for it in [a, b]]
Exception in thread Thread-10:
Traceback (most recent call last):
File "/usr/lib/python3.4/threading.py", line 920, in _bootstrap_inner
self.run()
File "/usr/lib/python3.4/threading.py", line 868, in run
self._target(*self._args, **self._kwargs)
ValueError: generator already executing
是的,tee
是用 C 编写的,而且 GIL(全局解释器锁)一次只执行一条字节码。但上面的例子表明,这并不足以确保线程安全。在这个过程中发生了以下情况:
- 两个线程对它们的 tee_object 实例调用了相同次数的
next
, - 线程 1 调用
next(a)
, - 它需要获取一个新元素,所以线程 1 现在调用
next(gen)
, gen
是用 Python 编写的。在,比如说,gen.__next__
的第一条字节码上,CPython 决定切换线程,- 线程 2 恢复并调用
next(b)
, - 它需要获取一个新元素,所以它调用
next(gen)
- 由于
gen.__next__
已经在线程 1 中运行,我们就会遇到异常。
不工作示例 - 迭代器对象
好吧,也许在 `tee` 中使用生成器本身就不安全。然后我们运行一个使用迭代器对象的变体:>>> from itertools import tee
>>> from threading import Thread
>>> class countdown(object):
... def __init__(self, n):
... self.i = n
... def __iter__(self):
... return self
... def __next__(self):
... self.i -= 1
... if self.i < 0:
... raise StopIteration
... return self.i
...
>>> a, b = tee(countdown(100000))
>>> [Thread(target=sum, args=(it,)).start() for it in [a, b]]
Segmentation fault (core dumped)
上述代码在 Python 2.7.13 和 3.6(可能所有 CPython 版本)中崩溃,在 Ubuntu、Windows 7 和 OSX 上也是如此。我暂时不想揭示原因,再等一步。
如果我在迭代器中使用锁呢?
也许上述代码崩溃是因为我们的迭代器本身不安全。让我们加一个锁,看看会发生什么:>>> from itertools import tee
>>> from threading import Thread, Lock
>>> class countdown(object):
... def __init__(self, n):
... self.i = n
... self.lock = Lock()
... def __iter__(self):
... return self
... def __next__(self):
... with self.lock:
... self.i -= 1
... if self.i < 0:
... raise StopIteration
... return self.i
...
>>> a, b = tee(countdown(100000))
>>> [Thread(target=sum, args=(it,)).start() for it in [a, b]]
Segmentation fault (core dumped)
在我们的迭代器中添加一个锁并不足以让 tee
变得线程安全。
为什么 tee 不是线程安全的
问题的关键在于 CPython 中 teedataobject
的 getitem
方法,位于 Modules/itertoolsmodule.c
文件中。tee
的实现非常酷,采用了一种节省内存的优化:tee
返回“tee 对象”,每个对象保存一个指向头部 teedataobject
的引用。这些对象就像链表中的链接,但它们不是保存单个元素,而是保存 57 个。这对我们来说并不重要,但就是这样。以下是 teedataobject
的 getitem
函数:
static PyObject *
teedataobject_getitem(teedataobject *tdo, int i)
{
PyObject *value;
assert(i < LINKCELLS);
if (i < tdo->numread)
value = tdo->values[i];
else {
/* this is the lead iterator, so fetch more data */
assert(i == tdo->numread);
value = PyIter_Next(tdo->it);
if (value == NULL)
return NULL;
tdo->numread++;
tdo->values[i] = value;
}
Py_INCREF(value);
return value;
}
当请求一个元素时,teedataobject
会检查是否有准备好的元素。如果有,它就返回这个元素。如果没有,它就会调用原始迭代器的 next
。如果迭代器是用 Python 编写的,这里可能会出现挂起的情况。所以问题在于:
- 两个线程调用了相同次数的
next
, - 线程 1 调用
next(a)
,C 代码到达PyIter_Next
调用。比如说,在next(gen)
的第一条字节码上,CPython 决定切换线程。 - 线程 2 调用
next(b)
,由于它仍然需要一个新元素,C 代码也到达PyIter_Next
调用,
此时两个线程都在同一个位置,i
和 tdo->numread
的值是相同的。注意 tdo->numread
是一个变量,用来跟踪 teedataobject
应该写入的 57 个单元中的位置。
线程 2 完成了对
PyIter_Next
的调用并返回了一个元素。此时 CPython 决定再次切换线程,线程 1 恢复,完成了对
PyIter_Next
的调用,然后运行以下两行:tdo->numread++; tdo->values[i] = value;
但线程 2 已经设置了
tdo->values[i]
!
这已经足够证明 tee
不是线程安全的,因为我们丢失了线程 2 放入 tdo->values[i]
的值。但这并没有解释崩溃的原因。
假设 i
是 56。由于两个线程都调用了 tdo->numread++
,它现在变成了 58 - 超过了 57,tdo->values
的分配大小。在线程 1 继续执行后,tdo
对象没有更多的引用,准备被删除。以下是 teedataobject
的清除函数:
static int
teedataobject_clear(teedataobject *tdo)
{
int i;
PyObject *tmp;
Py_CLEAR(tdo->it);
for (i=0 ; i<tdo->numread ; i++)
Py_CLEAR(tdo->values[i]); // <----- PROBLEM!!!
tmp = tdo->nextlink;
tdo->nextlink = NULL;
teedataobject_safe_decref(tmp);
return 0;
}
在标记为“问题”的那一行,CPython 会尝试清除 tdo->values[57]
。这就是崩溃发生的地方。好吧,有时是这样。崩溃的地方不止一个,我只是想展示一个。
现在你知道了 - itertools.tee
不是线程安全的。
一个解决方案 - 外部锁
与其在迭代器的 __next__
中加锁,不如把锁放在 tee.__next__
周围。这意味着每次调用 teedataobject.__getitem__
方法时,只有一个线程会执行。我在这个回答的开头提供了一个简短的实现。这是一个线程安全的 tee
的替代方案。唯一没有实现的功能是序列化(pickling),因为锁不能被序列化,这样添加就不简单了。不过,当然,这是可以做到的。
在C-Python中,itertools.tee()
这个函数和它返回的迭代器是用C语言写的。这意味着全局解释器锁(GIL)会保护它,防止多个线程同时调用。这样做可能会让它正常工作,也不会让解释器崩溃,但并不能保证它是线程安全的。
简单来说,就是别冒这个险。