是由于itertools.tee()线程安全(Python)

2024-03-29 08:49:59 发布

您现在位置:Python中文网/ 问答频道 /正文

假设我有以下Python代码:

from itertools import count, tee
original = count()     # just an example, can be another iterable
a, b = tee(original)

问题是,如果我开始在一个线程中迭代“a”,同时在另一个线程中迭代“b”,会有什么问题吗?显然,a和b共享一些数据(原始的iterable,+一些额外的东西,内部缓冲区或其他东西)。那么,a.next()和b.next()在访问共享数据时是否会执行适当的锁定?在


Tags: 数据代码fromimportanexamplecountiterable
3条回答

在C-Python中,itertools.tee()及其返回的迭代器是用C代码实现的。这意味着GIL应该保护它不被多个线程同时调用。它可能会正常工作,并且不会使解释器崩溃,但不能保证是线程安全的。在

简单地说,不要冒险。在

如果文档中显示了等效代码,则此处:

是正确的,那么不,它将不是线程安全的。在

注意,尽管deque被记录为具有线程安全的append和pop,但它并不保证使用它的代码。在

由于主代码最终可能会向底层迭代器请求多个线程上的元素,因此需要有一个线程安全的集合和迭代器作为输入,这样tee才是安全的。在

tl;dr

< CPython >中,{{CD1>}是线程安全的,EM>当且仅当EME>在C/C++中实现原迭代器时,即不使用<强>任意< /强> Python。在

如果原始迭代器it是用python编写的,就像类实例或生成器,那么itertools.tee(it)是线程安全的。在最好的情况下,您只会得到一个异常(您会),而在最坏的情况下,python将崩溃。在

这里不是使用tee,而是一个线程安全的包装类和函数:

class safeteeobject(object):
    """tee object wrapped to make it thread-safe"""
    def __init__(self, teeobj, lock):
        self.teeobj = teeobj
        self.lock = lock
    def __iter__(self):
        return self
    def __next__(self):
        with self.lock:
            return next(self.teeobj)
    def __copy__(self):
        return safeteeobject(self.teeobj.__copy__(), self.lock)

def safetee(iterable, n=2):
    """tuple of n independent thread-safe iterators"""
    lock = Lock()
    return tuple(safeteeobject(teeobj, lock) for teeobj in tee(iterable, n))

现在我将详细介绍tee是线程安全的还是非线程安全的,以及原因。在

举例说明没问题

让我们运行一些代码(这是python3代码,对于python2,使用itertools.izip而不是{}来具有相同的行为):

^{pr2}$

在itertools.count在Cpython项目的文件^ {CD8}}中完全是用C++编写的,所以它工作正常。在

对于:列表、元组、集合、范围、字典(键、值和项)、collections.defaultdict(键、值和项)以及其他一些元素也是如此。在

不起作用的例子-发电机

一个非常简短的例子是使用发电机:

>>> gen = (i for i in range(1000000))
>>> a, b = tee(gen)
>>> [Thread(target=sum, args=(it,)).start() for it in [a, b]]

Exception in thread Thread-10:
Traceback (most recent call last):
  File "/usr/lib/python3.4/threading.py", line 920, in _bootstrap_inner
    self.run()
  File "/usr/lib/python3.4/threading.py", line 868, in run
    self._target(*self._args, **self._kwargs)
ValueError: generator already executing

是的,^ {< CD4>}是用C++编写的,吉尔每次执行一个字节代码是正确的。但是上面的例子表明,这不足以确保线程安全。在这条线的某个地方发生了这样的事情:

  1. 这两个线程在其tee\u对象实例上调用next的次数相同
  2. 线程1调用next(a)
  3. 它需要一个新元素,所以线程1现在调用next(gen)
  4. gen是用python编写的。例如,gen.__next__CPython的第一个字节码决定切换线程
  5. 线程2继续并调用next(b)
  6. 它需要获取一个新元素,因此它调用next(gen)
  7. 由于gen.__next__已经在线程1中运行,我们得到一个异常。在

不起作用的示例-迭代器对象

好吧,也许在tee中使用生成器不是线程安全的。然后我们运行上述代码的一个变体,它使用迭代器对象:

>>> from itertools import tee
>>> from threading import Thread
>>> class countdown(object):
...     def __init__(self, n):
...         self.i = n
...     def __iter__(self):
...         return self
...     def __next__(self):
...         self.i -= 1
...         if self.i < 0:
...             raise StopIteration
...         return self.i
... 
>>> a, b = tee(countdown(100000))
>>> [Thread(target=sum, args=(it,)).start() for it in [a, b]]
Segmentation fault (core dumped)

上面的代码在python2.7.13和3.6(可能还有所有cpython版本)中崩溃,运行在Ubuntu、windows7和OSX上。我现在还不想透露原因,再往前走一步。在

如果我在迭代器中使用锁呢?在

也许上面的代码崩溃是因为迭代器本身不是线程安全的。我们添加一个锁,看看会发生什么:

>>> from itertools import tee
>>> from threading import Thread, Lock
>>> class countdown(object):
...     def __init__(self, n):
...         self.i = n
...         self.lock = Lock()
...     def __iter__(self):
...         return self
...     def __next__(self):
...         with self.lock:
...             self.i -= 1
...             if self.i < 0:
...                 raise StopIteration
...             return self.i
... 
>>> a, b = tee(countdown(100000))
>>> [Thread(target=sum, args=(it,)).start() for it in [a, b]]
Segmentation fault (core dumped)

在迭代器中添加锁不足以使tee线程安全。在

为什么T恤不安全

问题的关键在于CPython的文件Modules/itertoolsmodule.cgetitemgetitem方法。tee的实现非常酷,有一个可以节省RAM调用的优化:tee返回“tee objects”,每个对象都保存对head的引用teedataobject。这些链接又类似于链接列表中的链接,但它们不是包含一个元素而是包含57个元素。这对我们的目的来说并不重要,但它就是它的本质。这是getitemgetitem函数:

static PyObject *
teedataobject_getitem(teedataobject *tdo, int i)
{
    PyObject *value;

    assert(i < LINKCELLS);
    if (i < tdo->numread)
        value = tdo->values[i];
    else {
        /* this is the lead iterator, so fetch more data */
        assert(i == tdo->numread);
        value = PyIter_Next(tdo->it);
        if (value == NULL)
            return NULL;
        tdo->numread++;
        tdo->values[i] = value;
    }
    Py_INCREF(value);
    return value;
}

当请求元素时,teedataobject检查它是否准备好了一个。如果是,则返回它。如果没有,则调用原始迭代器上的next。如果迭代器是用python编写的,代码可以挂起。所以问题是:

  1. 两个线程调用next的次数相同
  2. 线程1调用next(a),C代码到达上面的PyIter_Next调用。例如,在next(gen)的第一个字节码上,CPython决定切换线程。在
  3. 线程2调用next(b),由于它仍然需要一个新元素,所以C代码得到PyIter_Next调用

此时,两个线程位于同一个位置,itdo->numread的值相同。请注意,tdo->numread只是一个变量,用于跟踪57个单元格链接的位置teedataobject应该写入next。在

  1. 线程2完成对PyIter_Next的调用并返回一个元素。在某个时候,CPython决定再次切换线程
  2. 线程1继续,完成对PyIter_Next的调用,然后运行两行:

        tdo->numread++;
        tdo->values[i] = value;
    
  3. 但是线程2已经设置了tdo->values[i]

这已经足以证明tee不是线程安全的,因为我们丢失了线程2在tdo->values[i]中的值。但这不能解释撞车的原因。在

假设{}是56岁。由于两个线程都调用tdo->numread++,它现在达到了58—比57大,即分配的tdo->values的大小。在线程1继续之后,对象tdo不再有引用,可以删除。这是teedataobject的清除函数:

static int
teedataobject_clear(teedataobject *tdo)
{
    int i;
    PyObject *tmp;

    Py_CLEAR(tdo->it);
    for (i=0 ; i<tdo->numread ; i++)
        Py_CLEAR(tdo->values[i]); // <  - PROBLEM!!!
    tmp = tdo->nextlink;
    tdo->nextlink = NULL;
    teedataobject_safe_decref(tmp);
    return 0;
}

在标记为“PROBLEM”的行,CPython将尝试清除tdo->values[57]。这就是车祸发生的地方。嗯,有时候。撞车的地方不止一个,我只想展示一个。在

现在您知道-itertools.tee不是线程安全的。在

一个解决方案-外部锁

我们不必在迭代器的__next__内锁定,而是在tee.__next__周围加一个锁。这意味着整个teedataobject.__getitem__方法每次都将由一个线程调用。我在这个答案的开头给出了一个简短的实现。它是线程安全的tee的直接替代品。它唯一没有实现tee所做的事情是酸洗。因为锁是不可拾取的,所以添加这个并不容易。但是,当然,这是可以做到的。在

相关问题 更多 >