python的队列是不完整的还是我的设计有缺陷

2024-04-26 00:18:59 发布

您现在位置:Python中文网/ 问答频道 /正文

python中的队列对于多线程是实用的,但是它不支持在队列无限期为空时停止工作线程。在

例如,考虑如下:

queue = Queue()

def process(payload):
  time.sleep(random())

def work(item):
  while(True):
    payload = queue.get()
    try:
      process(payload)
    except:
      print("TERROR ERROR!")
    finally:
      queue.task_done()

threads = dict()
for thread_id in range(10):
  threads[thread_id] = Thread(target=work)
  threads[thread_id].deamon = True
  threads[thread_id].start()

for payload in range(100):
  queue.put(payload)

queue.join();

所以这很有效,但不是真的。这个队列.join()等待所有项目报告完成,然后主线程完成,但工作线程将无限期等待。如果这将是(unix)进程的结束,当然,我们可以把它留给操作系统,但如果它继续下去,就会有这些等待线程溢出资源。在

然后我们实现一个sentinel、EOQ或bottom或任何你想称之为的东西:

^{pr2}$

这是一个更好的解决方案,因为线程现在停止了。然而,注入哨兵的代码是笨拙的,而且容易出错。考虑一下我不小心放的太少,或者一个工作线程意外地处理了两个,这样其他工作线程就不会得到它们的线程了。在

或者:

class FiniteQueue(Queue):
  def __init__(self, ....)
    super() .__init__(....)
    self.finished = False

  def put(self, item, ...):
    if self.finished:
      raise AlreadyFinished()
    super().put(item, ...)

  def set_finished(self):
    self.finished=True

  def get(self, ...):
    if self.finished:
      raise AlreadyFinished()
    return super().get(....)

显然,我很懒,没有使put()方法线程安全,不过这是很有可能做到的。这样,工作人员可以简单地捕获AlreadyFinished对象,然后停止。在

当所有有效负载都已输入时,主队列可以简单地应用set_finished()。然后,队列可以检测到何时它将无法获得更多的有效负载,并将此情况报告给工作人员(如果您愿意,也可以报告给使用者)。在

为什么python队列不提供set_finished()功能?它不干扰无止境的_queue用例,但支持有限的处理管道。在

我在这个设计中遗漏了一个明显的错误吗?这是一个人不应该想要的吗?有没有比提供的FiniteQueue更简单的替代方案?在


Tags: selfidtrueget队列queueputdef
3条回答

为了解决sentinel问题,我加入了与sentinel相同的数量,因为有工作线程。如果工作线程检测到线程,它将退出,因此不可能有重复线程。作为sentinel,我使用了一个函数的引用,这个函数从来没有被调用过。在

#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
#  test_queue.py
#  
#  Copyright 2015 John Coppens <john@jcoppens.com>
#  
#  This program is free software; you can redistribute it and/or modify
#  it under the terms of the GNU General Public License as published by
#  the Free Software Foundation; either version 2 of the License, or
#  (at your option) any later version.
#  
#  This program is distributed in the hope that it will be useful,
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#  GNU General Public License for more details.
#  
#  You should have received a copy of the GNU General Public License
#  along with this program; if not, write to the Free Software
#  Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
#  MA 02110-1301, USA.
#  
#

from Queue import Queue
from threading import Thread
import time, random

NR_WORKERS = 10
queue = Queue()

# The sentinel function below is bot necessary - you can use None as sentinel, 
# though this function might be useful if the worker wants to do something
# after the last job (like wash his hands :)  (Thanks Darren Ringer)

# def sentinel():
#    pass

def process(payload):
    time.sleep(random.random())
    print payload

def work():
    while(True):
        payload = queue.get()
        print "Got from queue: ", payload
        #if payload == sentinel:
        if payload == None:         # See comment above
            queue.task_done()
            break
        process(payload)
        queue.task_done()

threads = dict()
for id in range(NR_WORKERS):
    print "Creating worker ", id
    threads[id] = Thread(target=work)
    #threads[id].deamon = False
    threads[id].start()

for payload in range(100):
    queue.put(payload)

for stopper in range(NR_WORKERS):
    # queue.put(sentinel)             # See comment at def sentinel()
    queue.put(None)

queue.join();

编辑:感谢@DarrenRinger建议不要使用。我以前也试过,但失败了(我怀疑是因为另一个问题)

我建议使用由所有worker共享的^{}对象,而不是sentinel,以避免意外地混合数据和sentinel。还要确保在queue.get()上实现阻塞和超时,以免浪费资源。在

from __future__ import print_function 
from threading import Thread, Event, current_thread 
from Queue import Queue import time

queue = Queue() 
evt = Event()

def process(payload):   
    time.sleep(1)

def work():   
    tid = current_thread().name
    # try until signaled not to   
    while(not evt.is_set()):
      try:
        # block for max 1 second
        payload = queue.get(True, 1)
        process(payload)
      except Exception as e:
        print("%s thread exception %s" % (tid, e)) 
      else:
        # calling task_done() in finally may cause too many calls
        # resulting in an exception   only call it once a task has actually been done
        queue.task_done()

threads = dict() 
for thread_id in range(10):   
    threads[thread_id] = Thread(target=work)                 
    threads[thread_id].deamon = True  
    threads[thread_id].start()
for payload in range(10):
   queue.put(payload)
queue.join() 
# all workers will end in approx 1 second at most
evt.set()

你对哨兵对象的处理方法是正确和良好的。对于workerthreadaccidentally process twoSentinel对象是不可能的,因为当它发现其中一个对象时,它的处理周期就会中断。在

FiniteQueue方法将不起作用,因为设置finished标志将不会唤醒worker,在语句super().get(....)被阻塞。在

这是大多数支持线程的编程语言的常见问题:同时阻塞两个条件的等待。在您的例子中,get()方法应该等待:

1)队列变为非空

2)或finish flag设置为true

为了正确,wait方法必须知道这两个条件。这使得使用支持wait的现成对象更加困难。有些语言支持某种类型的线程中断,这会唤醒阻塞的线程。Python似乎缺少这种机制。在

相关问题 更多 >