使用嵌套类在进程间共享队列(Python)

1 投票
2 回答
1928 浏览
提问于 2025-04-16 06:16

我有个关于在Python中进程间共享队列的问题。下面,我有三个队列,一个主进程和三个内部进程。每个内部进程都会从不同的队列中添加和获取值(它们需要方便地访问这些队列)。

我觉得现在的代码是可以工作的,但这段代码是我即将进行的大项目的基础,我想确保没有更好的方法我不知道。我就是随便想出来这个主意的。从其他一些帖子来看,嵌套类在Python中似乎不是很常见。

有没有什么建议?这段代码是容易读还是难读?我应该放弃嵌套类,还是就这样保持不变?

谢谢大家。

class MainController(Process):
    def __init__(self):
        self.queue_stream   = Queue()
        self.queue_language = Queue()
        self.queue_expander = Queue()

        self.control_stream   = self.StreamController(self).start()
        self.control_language = self.LanguageController(self).start()
        self.control_expander = self.ExpanderController(self).start()

        print 'Launching Main Controller'

    class StreamController(Process):
        def __init__(self, main):
            Process.__init__(self)
            self.main = main
            print 'Launching Stream Controller'

        def run(self):
            while True:
                self.main.queue_stream.put('hello, stream')

    class LanguageController(Process):
        def __init__(self, main):
            Process.__init__(self)
            self.main = main
            print 'Launching Language Controller'

        def run(self):
            while True:
                print self.main.queue_stream.get()
                self.main.queue_language.put('hello, language')

    class ExpanderController(Process):
        def __init__(self, main):
            Process.__init__(self)
            self.main = main
            print 'Launching Expander Controller'

        def run(self):
            while True:
                print self.main.queue_language.get()
                self.main.queue_expander.put('hello, expander')

def main():
    # Launch all queues for the system
    control_main = MainController()

if __name__ == '__main__':
    print 'Launching System...'
    main()

2 个回答

1

我建议你使用线程模块,而不是进程。如果子类只是为了扩展父类的功能,才使用嵌套类。

class   WorkerThread(threading.Thread):

另一个建议是,在你的子线程之间使用共享锁,这样可以防止在共享队列上出现竞争条件。

            tasks_lock.acquire()
            ret = tasks_queue.get()
            tasks_lock.release()

你可以看看这个 示例

0

孩子现在必须了解父亲的实现方式。我对此表示反对。

def infinite_producer(queue):
    while True:
        queue.put('hello, stream')

class MainController(Process):
    def __init__(self):
        self.queue_stream   = Queue()
        self.queue_language = Queue()
        self.queue_expander = Queue()
        self.self.control_stream = Process(target=infinite_producer,self.queue_stream)

    def run(self):
        self.control_stream.start()

 #... etc you get the idea.

if __name__ == '__main__':
    print 'Launching System...'
    control_main = MainController()
    control_main.start()

撰写回答