有没有办法优雅地退出一个处理(无限)生成器数据的线程?
这里有个问题:我有一个线程,它在运行一个循环,从一个生成器中读取数据,并对这些数据进行处理。这个生成器总是有数据进来,所以它不会抛出StopIteration
这个异常。我想从主线程干净利落地停止这个线程(也就是退出正在处理生成器数据的循环)。下面是这个场景的一个例子,结果是正确的,但有一些限制,我会在下面描述:
import threading
import time
import random
def add():
r = random.Random()
i = 0
while True:
sleep_time = r.randint(0, 3)
time.sleep(sleep_time)
yield i
i = i + 1
class Test(object):
def __init__(self):
self.func = add
self.stopped = False
def stop(self):
self.stopped = True
def run(self):
self.generator = self.func()
for x in self.generator:
print x
if self.stopped is True:
break
print 'DONE'
tester = Test()
thread = threading.Thread(target=tester.run)
thread.daemon = True
thread.start()
time.sleep(10)
print 'Stopping thread'
tester.stop()
print 'Complete, but should stop immediately!'
现在,虽然上面的例子可以正常工作(显然,上面的代码没有解决self.stopped
的竞争条件,但这不是我现在要解决的问题,所以我把那部分代码省略了),我遇到的问题是,在我真实的代码中,生成器并不总是立即有数据,所以在self.stopped
被设置和break
语句实际执行之间可能会有很长的暂停。因此,我的问题的核心是,我希望能够尽快干净地退出这个循环,而不是等生成器有数据才能退出,显然,上面的解决方案无法做到这一点。
有没有希望呢?这是个相当棘手的问题,可能没有干净的解决方案,但任何帮助都将非常感激。
编辑:为了澄清,在我的真实应用中,我有一个生成器(我们称之为G),它从内核驱动程序中获取数据。这些数据将被发送到服务器,但在套接字尝试连接到服务器时(服务器可能并不总是运行),我想处理来自驱动程序的数据(连接后就不再处理了)。所以我启动了一个线程来从G获取数据并处理,同时主线程尝试连接到服务器。一旦连接成功,理想情况下应该发生以下情况:
我暂停G的执行,退出线程,并将同一个 G实例传递给另一个函数,直接将数据发送到服务器。
根据下面的回答/评论,我认为这是不可能的,除非销毁G,因为没有办法干净地暂停一个正在执行的生成器。
抱歉让你困惑了。
4 个回答
听起来你真正想要的是协程,而不是生成器。可以看看David Beazley的那本让人脑洞大开的《协程与并发的奇妙课程》。虽然里面的信息比你需要的多得多,但应该能让你对自己想做的事情有更清晰的理解。
首先,生成器可能并不是你需要担心的重点,可以先放一放。
在Python中,解决生产者-消费者问题的标准方法是使用内置的queue
模块。这个模块就像一个中介,允许你的生产者线程不断从内核获取和处理数据到队列中,而消费者线程则可以将队列中的数据发送到服务器,这样它们之间就不会因为各自的输入输出操作而互相干扰。
下面是这个基本思路的一个简单示意,具体细节没有填上:
from queue import Queue
class Application(object):
def __init__(self):
self.q = Queue()
self.running = False
# From kernel to queue
def produce(self):
while self.running:
data = read_from_kernel()
self.q.put(data)
# From queue to server
def consume(self):
while self.running:
data = self.q.get()
send_to_server(data)
# Start producer thread, then consume
def run():
try:
self.running = True
producer = Thread(target=self.produce)
producer.start()
self.consume()
finally:
self.running = False
如果self.running
被设置为False,上面的代码中的produce
方法仍然会在read_from_kernel
里面阻塞,直到下一个返回值才会退出,但这在Python中几乎无能为力。你使用的系统调用必须以某种方式支持这一点:如果是实际的read
,那么你可以考虑以下选项:
- 设置一个短的超时时间,并加上重试处理
- 使用非阻塞的输入输出(不过在这种情况下,你可能想要研究一下基于这个概念的框架,比如Twisted Python)
你需要让自定义的生成器具备超时的功能。从概念上来说,
wait(1 sec);
而不仅仅是
wait();
我不确定这是否可行(给我们看看你的生成器代码)。比如说,如果你是从管道或者套接字读取数据,就不要这样写
giveMeSomeBytes( buffer); // wait indefinately
代码
giveMeSomeBytesOrTimeout( buffer, howLongToWait); // wait for a while and
// then go see if we should dies