有没有办法优雅地退出一个处理(无限)生成器数据的线程?

3 投票
4 回答
501 浏览
提问于 2025-04-16 04:51

这里有个问题:我有一个线程,它在运行一个循环,从一个生成器中读取数据,并对这些数据进行处理。这个生成器总是有数据进来,所以它不会抛出StopIteration这个异常。我想从主线程干净利落地停止这个线程(也就是退出正在处理生成器数据的循环)。下面是这个场景的一个例子,结果是正确的,但有一些限制,我会在下面描述:

import threading
import time
import random

def add():
    r = random.Random()
    i = 0
    while True:
        sleep_time = r.randint(0, 3)
        time.sleep(sleep_time)
        yield i
        i = i + 1

class Test(object):

    def __init__(self):
        self.func = add
        self.stopped = False

    def stop(self):
        self.stopped = True

    def run(self):
        self.generator = self.func()
        for x in self.generator:
            print x
            if self.stopped is True:
                break
        print 'DONE'


tester = Test()
thread = threading.Thread(target=tester.run)
thread.daemon = True
thread.start()
time.sleep(10)
print 'Stopping thread'
tester.stop()
print 'Complete, but should stop immediately!'

现在,虽然上面的例子可以正常工作(显然,上面的代码没有解决self.stopped的竞争条件,但这不是我现在要解决的问题,所以我把那部分代码省略了),我遇到的问题是,在我真实的代码中,生成器并不总是立即有数据,所以在self.stopped被设置和break语句实际执行之间可能会有很长的暂停。因此,我的问题的核心是,我希望能够尽快干净地退出这个循环,而不是等生成器有数据才能退出,显然,上面的解决方案无法做到这一点。

有没有希望呢?这是个相当棘手的问题,可能没有干净的解决方案,但任何帮助都将非常感激。

编辑:为了澄清,在我的真实应用中,我有一个生成器(我们称之为G),它从内核驱动程序中获取数据。这些数据将被发送到服务器,但在套接字尝试连接到服务器时(服务器可能并不总是运行),我想处理来自驱动程序的数据(连接后就不再处理了)。所以我启动了一个线程来从G获取数据并处理,同时主线程尝试连接到服务器。一旦连接成功,理想情况下应该发生以下情况:

我暂停G的执行,退出线程,并将同一个 G实例传递给另一个函数,直接将数据发送到服务器。

根据下面的回答/评论,我认为这是不可能的,除非销毁G,因为没有办法干净地暂停一个正在执行的生成器。

抱歉让你困惑了。

4 个回答

0

听起来你真正想要的是协程,而不是生成器。可以看看David Beazley的那本让人脑洞大开的《协程与并发的奇妙课程》。虽然里面的信息比你需要的多得多,但应该能让你对自己想做的事情有更清晰的理解。

0

首先,生成器可能并不是你需要担心的重点,可以先放一放。

在Python中,解决生产者-消费者问题的标准方法是使用内置的queue模块。这个模块就像一个中介,允许你的生产者线程不断从内核获取和处理数据到队列中,而消费者线程则可以将队列中的数据发送到服务器,这样它们之间就不会因为各自的输入输出操作而互相干扰。

下面是这个基本思路的一个简单示意,具体细节没有填上:

from queue import Queue

class Application(object):

    def __init__(self):
        self.q = Queue()
        self.running = False

    # From kernel to queue
    def produce(self):
        while self.running:
            data = read_from_kernel()
            self.q.put(data)

    # From queue to server
    def consume(self):
        while self.running:
            data = self.q.get()
            send_to_server(data)

    # Start producer thread, then consume
    def run():
        try:
            self.running = True
            producer = Thread(target=self.produce)
            producer.start()
            self.consume()
        finally:
            self.running = False

如果self.running被设置为False,上面的代码中的produce方法仍然会在read_from_kernel里面阻塞,直到下一个返回值才会退出,但这在Python中几乎无能为力。你使用的系统调用必须以某种方式支持这一点:如果是实际的read,那么你可以考虑以下选项:

  • 设置一个短的超时时间,并加上重试处理
  • 使用非阻塞的输入输出(不过在这种情况下,你可能想要研究一下基于这个概念的框架,比如Twisted Python
0

你需要让自定义的生成器具备超时的功能。从概念上来说,

wait(1 sec);

而不仅仅是

wait();

我不确定这是否可行(给我们看看你的生成器代码)。比如说,如果你是从管道或者套接字读取数据,就不要这样写

giveMeSomeBytes( buffer);  // wait indefinately

代码

giveMeSomeBytesOrTimeout( buffer, howLongToWait); // wait for a while and 
                                                  // then go see if we should dies

撰写回答