树中的生产者消费者队列

1 投票
1 回答
521 浏览
提问于 2025-04-15 14:45

我正在学习如何使用Python的队列来在节点之间发送和接收短消息。我正在模拟一组以树形结构排列的节点。我希望这些节点中的一些能够向它们的父节点发送固定大小的数据。一旦父节点从一些子节点接收到数据,它就会“处理”这些数据,并向它的父节点发送一个“汇总”包……依此类推。

为了实现这个目标,有人告诉我队列可以帮助我传递消息,快速了解后我觉得这确实能满足我的需求。不过,我发现实现一个基本的设置有点困难,想测试一下我的理解——一个生产者(生成消息包)和一个消费者(工作线程,从队列中取出任务并处理)。

我在这里和其他地方搜索并阅读了很多帖子……我理解了所有队列的方法。但我仍然不明白如何将一个队列与两个特定的节点关联或绑定在一起。

我想让节点1和节点2向节点3发送消息。对于这个基本场景,我必须以某种方式创建一个(或两个)队列,并将其“关联”到节点1和节点2,这样它们就可以把消息放入队列中,而节点3也必须“监听”这个队列,以“获取”或取出任务。

如果节点1和节点2是“生产者”,我应该将它们作为两个独立的线程,而节点3则是第三个线程。然后,我必须创建一个队列Q。接着,节点1和节点2会生成消息,并将它们“放入”队列中。节点3必须以某种方式被通知或唤醒,以“获取”这些来自Q的消息并进行处理。

我看过

http://docs.python.org/library/queue.html#module-Queue

这是我目前的(未经测试的)代码:

=================================================================

import threading
import queue

q = Queue.Queue(2)   # create a queue of size 2.

# worker is node-3 which received msgs from 1 and 2 and fuses them.
def worker():
    while True:
        msg = q.get()
        fuse_msgs(msg)
        q.task_done()

# generate 3 worker threads. Node-3 could be both a producer and consumer. Each
# thread represents each node that will be a potential producer/consumer or both.
# need something like t1 - thread-1 for node-1 ...

for i in range(3):
     t = Thread(target=worker)
     t.setDaemon(True)
     t.start()

# How can I make node-1 and node-2 to put items into a specified queue???



for msg in source():
    q.put(item)

q.join()  

=========================================

我这样做是对的吗?请告诉我哪里做错了,或者我有什么误解……

任何帮助都非常感谢。我有成百上千的节点和链接。如果我能把这些基本概念搞清楚,我就能顺利继续下去……

谢谢,
B.R.Srini。

1 个回答

1

我不是专门在评论你的Python代码,而是针对你提到的队列设计来说,似乎在你描述的节点1、2、3的场景中,你只需要一个队列。简单来说,你有一个队列,节点1和节点2往里面放消息,节点3从里面读取消息。

你可以让节点3在这个队列上进行“阻塞式”获取,这样它就会一直等着,直到看到有消息可以处理,而节点1和节点2可以尽可能快地生成它们的输出。

根据每个节点的处理速度和你预期的流量模式,你可能希望队列的深度超过2条消息,这样生产消息的节点就不用等着队列被处理完。

撰写回答