多线程检查队列中的成员资格并停止线程

2024-04-25 21:15:32 发布

您现在位置:Python中文网/ 问答频道 /正文

我想用2线程迭代列表。一个来自前导,另一个来自尾随,并在每次迭代中将元素放在Queue中。但是在将值放入Queue之前,我需要检查Queue中是否存在该值(当其中一个线程将该值放入Queue中时),因此当发生这种情况时,我需要停止线程并返回每个线程的遍历值列表。在

到目前为止,我一直在努力:

from Queue import Queue
from threading import Thread, Event

class ThreadWithReturnValue(Thread):
    def __init__(self, group=None, target=None, name=None,
                 args=(), kwargs={}, Verbose=None):
        Thread.__init__(self, group, target, name, args, kwargs, Verbose)
        self._return = None
    def run(self):
        if self._Thread__target is not None:
            self._return = self._Thread__target(*self._Thread__args,
                                                **self._Thread__kwargs)
    def join(self):
        Thread.join(self)
        return self._return

main_path = Queue()

def is_in_queue(x, q):
   with q.mutex:
      return x in q.queue

def a(main_path,g,l=[]):
  for i in g:
    l.append(i)
    print 'a'
    if is_in_queue(i,main_path):
      return l
    main_path.put(i)

def b(main_path,g,l=[]):
  for i in g:
    l.append(i)
    print 'b'
    if is_in_queue(i,main_path):
      return l
    main_path.put(i)

g=['a','b','c','d','e','f','g','h','i','j','k','l']

t1 = ThreadWithReturnValue(target=a, args=(main_path,g))
t2 = ThreadWithReturnValue(target=b, args=(main_path,g[::-1]))
t2.start()
t1.start()
# Wait for all produced items to be consumed
print main_path.join()

我使用了ThreadWithReturnValue,它将创建一个返回值的自定义线程。在

对于会员资格检查,我使用了以下函数:

^{pr2}$

现在,如果我首先启动t1,然后t2我将得到12a,然后是一个b,那么它不会做任何事情,我需要手动终止python!在

但是如果我首先运行t2,然后t1我将得到以下结果:

b
b
b
b
 ab

ab
b

b
b
 b
a
a

所以我的问题是,为什么python在这种情况下会有所不同?如何终止线程并使它们彼此通信?在


Tags: pathinselfnonetargetreturnqueueis
2条回答

我认为有几点可以改进:

  1. 由于GIL,您可能希望使用^{}(而不是threading)模块。一般来说,CPython线程不会导致CPU密集型工作加速。(取决于问题的上下文,也有可能{}不会,但{}几乎肯定不会。)
  2. 像您的is_inqueue这样的函数可能会导致高争用。在

锁定时间与需要遍历的项目数呈线性关系:

def is_in_queue(x, q):
    with q.mutex:
        return x in q.queue

所以,你可以做以下的事情。在

multiprocessing与共享的dict一起使用:

^{pr2}$

在每个函数中,检查以下项:

def p1(d):

    # Stuff

    if 'foo' in d:
        return 

在我们陷入更大的问题之前,您没有正确使用^{}。在

此函数的全部意义在于,向队列添加一组项目的生产者可以等到消费者或消费者完成对所有这些项目的处理。这是通过让消费者在完成他们用get完成的每一项工作后给task_done打电话。一旦task_done调用与put调用一样多,队列就完成了。您不会在任何地方执行get,更不用说task_done,因此队列不可能完成。所以,这就是为什么在两个线程完成后永远阻塞。在


这里的第一个问题是,除了实际的同步之外,线程几乎不做任何工作。如果他们所做的唯一的事情就是为了排队而打架,那么一次只能有一个人跑。在

当然,这在玩具问题中很常见,但你必须仔细考虑你真正的问题:

  • 如果您要做大量的I/O工作(监听套接字、等待用户输入等),线程的工作效果非常好。在
  • 如果您要做大量的CPU工作(计算素数),那么由于GIL的存在,线程在Python中不起作用,但是进程可以。在
  • 如果您实际上主要处理的是同步不同的任务,那么这两个任务都不能正常工作(流程也会更糟)。从线程的角度考虑问题可能仍然比较简单,但这将是最慢的方法。您可能需要研究协同程序;Greg Ewing有一个great demonstration如何使用yield from来使用协程来构建诸如调度器或许多参与者模拟之类的东西。在

接下来,正如我在上一个问题中提到的,要使线程(或进程)在共享状态下高效地工作,需要尽可能短地保持锁。在

所以,如果你要在锁下搜索整个队列,最好是固定时间搜索,而不是线性时间搜索。这就是为什么我建议使用类似OrderedSet的方法,而不是list,比如stdlib的Queue.Queue。那么这个函数:

def is_in_queue(x, q):
   with q.mutex:
      return x in q.queue

…只阻塞队列很短的一秒,只够在表中查找哈希值,而不是足够长来比较队列中的每个元素与x。在


最后,我试着在你的另一个问题上解释一下比赛条件,但让我再试一次。在

您需要在代码中的每个完整“事务”周围都有一个锁,而不仅仅是单个操作。在

例如,如果您这样做:

^{pr2}$

…那么当你检查的时候,x总是有可能不在队列中,但是在你解锁和重新锁定之间的这段时间里,有人添加了它。这就是为什么两个线程都可能提前停止。在

要解决这个问题,你需要把整个东西锁起来:

with queue locked:
    if x is not in the queue:
        add x to the queue

当然,这与我之前所说的尽可能短的时间锁定队列的内容背道而驰。实际上,这就是使多线程处理变得困难的原因。编写安全代码很容易,只要可能需要,就锁定所有内容,但是代码最终只使用一个内核,而所有其他线程都被阻塞等待锁定。很容易编写快速的代码,尽可能简短地锁定所有内容,但这样就不安全了,而且到处都是垃圾值甚至崩溃。弄清楚什么是事务,如何将这些事务中的工作最小化,以及如何处理多个锁,您可能需要在不死锁的情况下使其正常工作……这并不容易。在

相关问题 更多 >