Python 多进程自馈消费者锁定永远

1 投票
1 回答
801 浏览
提问于 2025-04-17 05:22

问题在于,消费者一直没有退出,它只是挂在那里什么都不做。代码的设计初衷是这样的:

首先创建一个队列,然后把一些任务数据放进去。接着创建指定数量的消费者来处理这些数据。当一个消费者发现队列是空的,它不能离开,因为还有可能有其他消费者会往队列里放东西。不过,它可以在 consumers_finished 列表中标记自己没有工作了。这些消费者会一直循环,直到所有的工作者都表示他们完成了工作。因为消费者会往队列里放任务,所以我们不知道会有多少工作要做。我看到过一些相关的内容,但不太清楚如果进程自己给自己提供任务,是否会一直等待下去。

class Consumer(multiprocessing.Process):

    def __init__(self, task_queue, results, consumers_finished):
        multiprocessing.Process.__init__(self)
        self.task_queue = task_queue
        self.results = results
        self.consumers_finished = consumers_finished

    def run(self):
        while not all(flag for flag in self.consumers_finished.values()):
            task_data = self.task_queue.get()
            if not task_data:
                self.consumers_finished[self.name] = True
                continue

            self.consumers_finished[self.name] = False
            task_result = self.do_some_processing(task_data)
            self.task_queue.put(task_result)


class Starter(object):

    def start(self):
        manager = multiprocessing.Manager()
        task_queue = multiprocessing.Queue()
        results = manager.list()
        consumers_finished = manager.dict()

        consumers = [Consumer(task_queue, results, consumers_finished) for i in range(self.consumers_count)]

        for consumer in consumers:
            consumers_finished[consumer.name] = False
            consumer.start()

        task_queue.put(task_data)

        for consumer in consumers: consumer.join()

        return results

1 个回答

2

看起来好好睡一觉真的很有帮助,清醒的头脑能做更多事情。总之,我在研究了Python的文档后找到了一个解决方案。

class Consumer(multiprocessing.Process):

    def __init__(self, task_queue, results, consumers_finished):
        multiprocessing.Process.__init__(self)
        self.task_queue = task_queue
        self.results = results
        self.consumers_finished = consumers_finished

    def run(self):
        while not all(flag for flag in self.consumers_finished.values()):
            try:
                task = self.todo_queue.get(False)
                self.consumers_finished[self.name] = False
            except QueueEmpty:
                self.consumers_stopped[self.name] = True
                continue

            task_result = self.do_some_processing(task_data)
            self.task_queue.put(task_result)


class Starter(object):

    def start(self):
        manager = multiprocessing.Manager()
        task_queue = manager.Queue()
        results = manager.list()
        consumers_finished = manager.dict()

        consumers = [Consumer(task_queue, results, consumers_finished) for i in range(self.consumers_count)]

        for consumer in consumers:
            consumers_finished[consumer.name] = False
            consumer.start()

        task_queue.put(task_data)

        for consumer in consumers: consumer.join()

        return results

这是Python文档中的一部分,我觉得它解释了我的问题:

警告:如上所述,如果一个子进程在队列中放了项目(而且它没有使用JoinableQueue.cancel_join_thread()),那么这个进程在所有缓冲的项目被处理完之前是不会结束的。这意味着如果你试图等待这个进程结束,可能会出现死锁,除非你确定所有放入队列的项目都已经被处理了。同样,如果子进程不是守护进程,那么当父进程尝试等待所有非守护子进程结束时,可能会在退出时卡住。注意,使用管理器创建的队列没有这个问题。请参见编程指南。

所以我只是换了队列,现在是由管理器创建的,在消费者的运行方法中,任务是以不同的方式从队列中取出的,具体代码见下文。

撰写回答