<h2>tl;dr</h2>
< CPython >中,{{CD1>}是线程安全的,EM>当且仅当EME>在C/C++中实现原迭代器时,即不使用<强>任意< /强> Python。在</p>
<p>如果原始迭代器<code>it</code>是用python编写的,就像类实例或生成器,那么<code>itertools.tee(it)</code>是线程安全的。在最好的情况下,您只会得到一个异常(您会),而在最坏的情况下,python将崩溃。在</p>
<p>这里不是使用<code>tee</code>,而是一个线程安全的包装类和函数:</p>
<pre><code>class safeteeobject(object):
"""tee object wrapped to make it thread-safe"""
def __init__(self, teeobj, lock):
self.teeobj = teeobj
self.lock = lock
def __iter__(self):
return self
def __next__(self):
with self.lock:
return next(self.teeobj)
def __copy__(self):
return safeteeobject(self.teeobj.__copy__(), self.lock)
def safetee(iterable, n=2):
"""tuple of n independent thread-safe iterators"""
lock = Lock()
return tuple(safeteeobject(teeobj, lock) for teeobj in tee(iterable, n))
</code></pre>
<p>现在我将详细介绍<code>tee</code>是线程安全的还是非线程安全的,以及原因。在</p>
<h2>举例说明没问题</h2>
<p>让我们运行一些代码(这是python3代码,对于python2,使用<code>itertools.izip</code>而不是{<cd7>}来具有相同的行为):</p>
^{pr2}$
<p>在itertools.count在Cpython项目的文件^ {CD8}}中完全是用C++编写的,所以它工作正常。在</p>
<p>对于:列表、元组、集合、范围、字典(键、值和项)、<code>collections.defaultdict</code>(键、值和项)以及其他一些元素也是如此。在</p>
<h2>不起作用的例子-发电机</h2>
<p>一个非常简短的例子是使用发电机:</p>
<pre><code>>>> gen = (i for i in range(1000000))
>>> a, b = tee(gen)
>>> [Thread(target=sum, args=(it,)).start() for it in [a, b]]
Exception in thread Thread-10:
Traceback (most recent call last):
File "/usr/lib/python3.4/threading.py", line 920, in _bootstrap_inner
self.run()
File "/usr/lib/python3.4/threading.py", line 868, in run
self._target(*self._args, **self._kwargs)
ValueError: generator already executing
</code></pre>
<P>是的,^ {< CD4>}是用C++编写的,吉尔每次执行一个字节代码是正确的。但是上面的例子表明,这不足以确保线程安全。在这条线的某个地方发生了这样的事情:</p>
<ol>
<li>这两个线程在其tee\u对象实例上调用<code>next</code>的次数相同</li>
<li>线程1调用<code>next(a)</code></li>
<li>它需要一个新元素,所以线程1现在调用<code>next(gen)</code></li>
<li><code>gen</code>是用python编写的。例如,<code>gen.__next__</code>CPython的第一个字节码决定切换线程</li>
<li>线程2继续并调用<code>next(b)</code></li>
<li>它需要获取一个新元素,因此它调用<code>next(gen)</code></li>
<li>由于<code>gen.__next__</code>已经在线程1中运行,我们得到一个异常。在</li>
</ol>
<h2>不起作用的示例-迭代器对象</h2>
<p>好吧,也许在<code>tee</code>中使用生成器不是线程安全的。然后我们运行上述代码的一个变体,它使用迭代器对象:</p>
<pre><code>>>> from itertools import tee
>>> from threading import Thread
>>> class countdown(object):
... def __init__(self, n):
... self.i = n
... def __iter__(self):
... return self
... def __next__(self):
... self.i -= 1
... if self.i < 0:
... raise StopIteration
... return self.i
...
>>> a, b = tee(countdown(100000))
>>> [Thread(target=sum, args=(it,)).start() for it in [a, b]]
Segmentation fault (core dumped)
</code></pre>
<p>上面的代码在python2.7.13和3.6(可能还有所有cpython版本)中崩溃,运行在Ubuntu、windows7和OSX上。我现在还不想透露原因,再往前走一步。在</p>
<h2>如果我在迭代器中使用锁呢?在</h2>
<p>也许上面的代码崩溃是因为迭代器本身不是线程安全的。我们添加一个锁,看看会发生什么:</p>
<pre><code>>>> from itertools import tee
>>> from threading import Thread, Lock
>>> class countdown(object):
... def __init__(self, n):
... self.i = n
... self.lock = Lock()
... def __iter__(self):
... return self
... def __next__(self):
... with self.lock:
... self.i -= 1
... if self.i < 0:
... raise StopIteration
... return self.i
...
>>> a, b = tee(countdown(100000))
>>> [Thread(target=sum, args=(it,)).start() for it in [a, b]]
Segmentation fault (core dumped)
</code></pre>
<p>在迭代器中添加锁不足以使<code>tee</code>线程安全。在</p>
<h2>为什么T恤不安全</h2>
<p>问题的关键在于CPython的文件<code>Modules/itertoolsmodule.c</code>中<code>getitem</code>的<code>getitem</code>方法。<code>tee</code>的实现非常酷,有一个可以节省RAM调用的优化:<code>tee</code>返回“tee objects”,每个对象都保存对head的引用<code>teedataobject</code>。这些链接又类似于链接列表中的链接,但它们不是包含一个元素而是包含57个元素。这对我们的目的来说并不重要,但它就是它的本质。这是<code>getitem</code>的<code>getitem</code>函数:</p>
<pre><code>static PyObject *
teedataobject_getitem(teedataobject *tdo, int i)
{
PyObject *value;
assert(i < LINKCELLS);
if (i < tdo->numread)
value = tdo->values[i];
else {
/* this is the lead iterator, so fetch more data */
assert(i == tdo->numread);
value = PyIter_Next(tdo->it);
if (value == NULL)
return NULL;
tdo->numread++;
tdo->values[i] = value;
}
Py_INCREF(value);
return value;
}
</code></pre>
<p>当请求元素时,<code>teedataobject</code>检查它是否准备好了一个。如果是,则返回它。如果没有,则调用原始迭代器上的<code>next</code>。如果迭代器是用python编写的,代码可以挂起。所以问题是:</p>
<ol>
<li>两个线程调用<code>next</code>的次数相同</li>
<li>线程1调用<code>next(a)</code>,C代码到达上面的<code>PyIter_Next</code>调用。例如,在<code>next(gen)</code>的第一个字节码上,CPython决定切换线程。在</li>
<li>线程2调用<code>next(b)</code>,由于它仍然需要一个新元素,所以C代码得到<code>PyIter_Next</code>调用</li>
</ol>
<p>此时,两个线程位于同一个位置,<code>i</code>和<code>tdo->numread</code>的值相同。请注意,<code>tdo->numread</code>只是一个变量,用于跟踪57个单元格链接的位置<code>teedataobject</code>应该写入next。在</p>
<ol start=“4”>
<li>线程2完成对<code>PyIter_Next</code>的调用并返回一个元素。在某个时候,CPython决定再次切换线程</li>
<li><p>线程1继续,完成对<code>PyIter_Next</code>的调用,然后运行两行:</p>
<pre><code> tdo->numread++;
tdo->values[i] = value;
</code></pre></li>
<li><p>但是线程2已经设置了<code>tdo->values[i]</code>!</p></li>
</ol>
<p>这已经足以证明<code>tee</code>不是线程安全的,因为我们丢失了线程2在<code>tdo->values[i]</code>中的值。但这不能解释撞车的原因。在</p>
<p>假设{<cd37>}是56岁。由于两个线程都调用<code>tdo->numread++</code>,它现在达到了58—比57大,即分配的<code>tdo->values</code>的大小。在线程1继续之后,对象<code>tdo</code>不再有引用,可以删除。这是<code>teedataobject</code>的清除函数:</p>
<pre><code>static int
teedataobject_clear(teedataobject *tdo)
{
int i;
PyObject *tmp;
Py_CLEAR(tdo->it);
for (i=0 ; i<tdo->numread ; i++)
Py_CLEAR(tdo->values[i]); // < - PROBLEM!!!
tmp = tdo->nextlink;
tdo->nextlink = NULL;
teedataobject_safe_decref(tmp);
return 0;
}
</code></pre>
<p>在标记为“PROBLEM”的行,CPython将尝试清除<code>tdo->values[57]</code>。这就是车祸发生的地方。嗯,有时候。撞车的地方不止一个,我只想展示一个。在</p>
<p>现在您知道-<code>itertools.tee</code>不是线程安全的。在</p>
<h2>一个解决方案-外部锁</h2>
<p>我们不必在迭代器的<code>__next__</code>内锁定,而是在<code>tee.__next__</code>周围加一个锁。这意味着整个<code>teedataobject.__getitem__</code>方法每次都将由一个线程调用。我在这个答案的开头给出了一个简短的实现。它是线程安全的<code>tee</code>的直接替代品。它唯一没有实现<code>tee</code>所做的事情是酸洗。因为锁是不可拾取的,所以添加这个并不容易。但是,当然,这是可以做到的。在</p>