itertools.tee()的结果是线程安全的吗?

15 投票
5 回答
1745 浏览
提问于 2025-04-16 21:35

假设我有这段Python代码:

from itertools import count, tee
original = count()     # just an example, can be another iterable
a, b = tee(original)

我的问题是,如果我在一个线程中开始遍历"a",同时在另一个线程中遍历"b",会不会出现什么问题?很明显,a和b共享了一些数据(比如原始的可迭代对象,还有一些额外的东西,内部缓存之类的)。那么,当a.next()和b.next()访问这些共享数据时,它们会适当地进行锁定吗?

5 个回答

2

如果文档中显示的等效代码是正确的,链接在这里:

那么答案是否定的,它不会是线程安全的。

需要注意的是,虽然deque被说明是线程安全的,可以安全地添加和删除元素,但这并不意味着使用它的代码也是安全的。

因为主代码可能会在多个线程中请求底层迭代器的元素,所以为了让tee安全,你需要确保输入的集合和迭代器都是线程安全的。

20

更新! 在较新的 Python 2.7、3.7、3.8 及以上版本中,tee 引起的段错误问题已经修复。不过,你仍然需要自己管理并发访问,以确保线程安全。你可以参考我下面的解决方案。

简而言之

在 CPython 中,`itertools.tee` 只有在原始迭代器是用 C/C++ 实现的情况下才是线程安全的,也就是说,不能使用 **任何** Python 代码。

如果原始迭代器 it 是用 Python 编写的,比如类实例或生成器,那么 itertools.tee(it)不是线程安全的。在最好的情况下,你只会遇到一个异常(你肯定会遇到),在最坏的情况下,Python 会崩溃。

与其使用 tee,不如使用下面这个线程安全的包装类和函数:

class safeteeobject(object):
    """tee object wrapped to make it thread-safe"""
    def __init__(self, teeobj, lock):
        self.teeobj = teeobj
        self.lock = lock
    def __iter__(self):
        return self
    def __next__(self):
        with self.lock:
            return next(self.teeobj)
    def __copy__(self):
        return safeteeobject(self.teeobj.__copy__(), self.lock)

def safetee(iterable, n=2):
    """tuple of n independent thread-safe iterators"""
    lock = Lock()
    return tuple(safeteeobject(teeobj, lock) for teeobj in tee(iterable, n))

接下来我会详细讲解 tee 什么时候是线程安全的,什么时候不是,以及原因。

可以正常工作的例子

我们来运行一些代码(这是 Python 3 的代码,如果是 Python 2,请用 `itertools.izip` 替代 `zip` 以获得相同的行为):
>>> from itertools import tee, count
>>> from threading import Thread

>>> def limited_sum(it):
...     s = 0
...     for elem, _ in zip(it, range(1000000)):
...         s += elem
...     print(elem)

>>> a, b = tee(count())
>>> [Thread(target=limited_sum, args=(it,)).start() for it in [a, b]]
# prints 499999500000 twice, which is in fact the same 1+...+999999

itertools.count 完全是用 C 编写的,位于 CPython 项目的 Modules/itertoolsmodule.c 文件中,所以它工作得很好。

列表、元组、集合、范围、字典(键、值和项)、collections.defaultdict(键、值和项)等也是如此。

不工作示例 - 生成器

一个非常简单的例子是使用生成器:
>>> gen = (i for i in range(1000000))
>>> a, b = tee(gen)
>>> [Thread(target=sum, args=(it,)).start() for it in [a, b]]

Exception in thread Thread-10:
Traceback (most recent call last):
  File "/usr/lib/python3.4/threading.py", line 920, in _bootstrap_inner
    self.run()
  File "/usr/lib/python3.4/threading.py", line 868, in run
    self._target(*self._args, **self._kwargs)
ValueError: generator already executing

是的,tee 是用 C 编写的,而且 GIL(全局解释器锁)一次只执行一条字节码。但上面的例子表明,这并不足以确保线程安全。在这个过程中发生了以下情况:

  1. 两个线程对它们的 tee_object 实例调用了相同次数的 next
  2. 线程 1 调用 next(a)
  3. 它需要获取一个新元素,所以线程 1 现在调用 next(gen)
  4. gen 是用 Python 编写的。在,比如说,gen.__next__ 的第一条字节码上,CPython 决定切换线程,
  5. 线程 2 恢复并调用 next(b)
  6. 它需要获取一个新元素,所以它调用 next(gen)
  7. 由于 gen.__next__ 已经在线程 1 中运行,我们就会遇到异常。

不工作示例 - 迭代器对象

好吧,也许在 `tee` 中使用生成器本身就不安全。然后我们运行一个使用迭代器对象的变体:
>>> from itertools import tee
>>> from threading import Thread
>>> class countdown(object):
...     def __init__(self, n):
...         self.i = n
...     def __iter__(self):
...         return self
...     def __next__(self):
...         self.i -= 1
...         if self.i < 0:
...             raise StopIteration
...         return self.i
... 
>>> a, b = tee(countdown(100000))
>>> [Thread(target=sum, args=(it,)).start() for it in [a, b]]
Segmentation fault (core dumped)

上述代码在 Python 2.7.13 和 3.6(可能所有 CPython 版本)中崩溃,在 Ubuntu、Windows 7 和 OSX 上也是如此。我暂时不想揭示原因,再等一步。

如果我在迭代器中使用锁呢?

也许上述代码崩溃是因为我们的迭代器本身不安全。让我们加一个锁,看看会发生什么:
>>> from itertools import tee
>>> from threading import Thread, Lock
>>> class countdown(object):
...     def __init__(self, n):
...         self.i = n
...         self.lock = Lock()
...     def __iter__(self):
...         return self
...     def __next__(self):
...         with self.lock:
...             self.i -= 1
...             if self.i < 0:
...                 raise StopIteration
...             return self.i
... 
>>> a, b = tee(countdown(100000))
>>> [Thread(target=sum, args=(it,)).start() for it in [a, b]]
Segmentation fault (core dumped)

在我们的迭代器中添加一个锁并不足以让 tee 变得线程安全。

为什么 tee 不是线程安全的

问题的关键在于 CPython 中 teedataobjectgetitem 方法,位于 Modules/itertoolsmodule.c 文件中。tee 的实现非常酷,采用了一种节省内存的优化:tee 返回“tee 对象”,每个对象保存一个指向头部 teedataobject 的引用。这些对象就像链表中的链接,但它们不是保存单个元素,而是保存 57 个。这对我们来说并不重要,但就是这样。以下是 teedataobjectgetitem 函数:

static PyObject *
teedataobject_getitem(teedataobject *tdo, int i)
{
    PyObject *value;

    assert(i < LINKCELLS);
    if (i < tdo->numread)
        value = tdo->values[i];
    else {
        /* this is the lead iterator, so fetch more data */
        assert(i == tdo->numread);
        value = PyIter_Next(tdo->it);
        if (value == NULL)
            return NULL;
        tdo->numread++;
        tdo->values[i] = value;
    }
    Py_INCREF(value);
    return value;
}

当请求一个元素时,teedataobject 会检查是否有准备好的元素。如果有,它就返回这个元素。如果没有,它就会调用原始迭代器的 next。如果迭代器是用 Python 编写的,这里可能会出现挂起的情况。所以问题在于:

  1. 两个线程调用了相同次数的 next
  2. 线程 1 调用 next(a),C 代码到达 PyIter_Next 调用。比如说,在 next(gen) 的第一条字节码上,CPython 决定切换线程。
  3. 线程 2 调用 next(b),由于它仍然需要一个新元素,C 代码也到达 PyIter_Next 调用,

此时两个线程都在同一个位置,itdo->numread 的值是相同的。注意 tdo->numread 是一个变量,用来跟踪 teedataobject 应该写入的 57 个单元中的位置。

  1. 线程 2 完成了对 PyIter_Next 的调用并返回了一个元素。此时 CPython 决定再次切换线程,

  2. 线程 1 恢复,完成了对 PyIter_Next 的调用,然后运行以下两行:

         tdo->numread++;
         tdo->values[i] = value;
    
  3. 但线程 2 已经设置了 tdo->values[i]

这已经足够证明 tee 不是线程安全的,因为我们丢失了线程 2 放入 tdo->values[i] 的值。但这并没有解释崩溃的原因。

假设 i 是 56。由于两个线程都调用了 tdo->numread++,它现在变成了 58 - 超过了 57,tdo->values 的分配大小。在线程 1 继续执行后,tdo 对象没有更多的引用,准备被删除。以下是 teedataobject 的清除函数:

static int
teedataobject_clear(teedataobject *tdo)
{
    int i;
    PyObject *tmp;

    Py_CLEAR(tdo->it);
    for (i=0 ; i<tdo->numread ; i++)
        Py_CLEAR(tdo->values[i]); // <----- PROBLEM!!!
    tmp = tdo->nextlink;
    tdo->nextlink = NULL;
    teedataobject_safe_decref(tmp);
    return 0;
}

在标记为“问题”的那一行,CPython 会尝试清除 tdo->values[57]。这就是崩溃发生的地方。好吧,有时是这样。崩溃的地方不止一个,我只是想展示一个。

现在你知道了 - itertools.tee 不是线程安全的。

一个解决方案 - 外部锁

与其在迭代器的 __next__ 中加锁,不如把锁放在 tee.__next__ 周围。这意味着每次调用 teedataobject.__getitem__ 方法时,只有一个线程会执行。我在这个回答的开头提供了一个简短的实现。这是一个线程安全的 tee 的替代方案。唯一没有实现的功能是序列化(pickling),因为锁不能被序列化,这样添加就不简单了。不过,当然,这是可以做到的。

0

在C-Python中,itertools.tee()这个函数和它返回的迭代器是用C语言写的。这意味着全局解释器锁(GIL)会保护它,防止多个线程同时调用。这样做可能会让它正常工作,也不会让解释器崩溃,但并不能保证它是线程安全的。

简单来说,就是别冒这个险。

撰写回答