如何在Python中实现多进程优先队列?

6 投票
6 回答
6710 浏览
提问于 2025-04-15 13:58

有没有人知道我怎么在Python里实现一个多进程的优先队列?

6 个回答

1

虽然这不是一个直接的答案,但也许它能帮助你开发一个多进程的队列。

下面是我用Python的数组写的一个简单的优先队列类:

class PriorityQueue():
    """A basic priority queue that dequeues items with the smallest priority number."""
    def __init__(self):
        """Initializes the queue with no items in it."""
        self.array = []
        self.count = 0

    def enqueue(self, item, priority):
        """Adds an item to the queue."""
        self.array.append([item, priority])
        self.count += 1

    def dequeue(self):
        """Removes the highest priority item (smallest priority number) from the queue."""
        max = -1
        dq = 0
        if(self.count > 0):
            self.count -= 1

            for i in range(len(self.array)):
                if self.array[i][1] != None and self.array[i][1] > max:
                    max = self.array[i][1]

            if max == -1:
                return self.array.pop(0)
            else:
                for i in range(len(self.array)):
                    if self.array[i][1] != None and self.array[i][1] <= max:
                        max = self.array[i][1]
                        dq = i
                return self.array.pop(dq)

    def requeue(self, item, newPrio):
        """Changes specified item's priority."""
        for i in range(len(self.array)):
            if self.array[i][0] == item:
                self.array[i][1] = newPrio
                break

    def returnArray(self):
        """Returns array representation of the queue."""
        return self.array

    def __len__(self):
        """Returnes the length of the queue."""
        return self.count
2

这里有一个问题,导致无法实现真正的先进先出(FIFO)。
可以在这里查看详细信息。

如果能找到另一种方法来建立一个优先级队列的多进程设置,那就太好了!

6

唉,这个事情没有那么简单,不能仅仅通过改变老式的 Queue.Queue 的排队规则来解决。其实,这个队列是按照一种模板方法模式设计的,想要改变排队规则,只需要重写一些特定的方法 _put 和/或 _get,就能轻松实现(在2.6版本中,已经提供了明确的后进先出和优先级实现,但在更早的Python版本中也很容易做到)。

对于多进程的情况,尤其是有多个读者和多个写者的情况下,我看不到有什么办法可以实现优先级队列,除非放弃队列的分布式特性。可以指定一个特殊的辅助进程,专门处理队列,其他进程通过发送远程过程调用(RPC)给它来创建一个具有特定规则的队列,进行数据的添加和获取,获取队列的信息等等。这样就会出现一些常见的问题,比如确保每个进程都知道这个辅助进程的位置(比如主机和端口),等等(如果这个进程总是由主进程在启动时创建,那就简单多了)。这可是个大问题,特别是如果想要保证性能好,还要防止辅助进程崩溃(这就需要把数据复制到从属进程,进行分布式的“主选举”,如果主进程崩溃等等)。从头开始做这件事听起来就像是要做博士论文的工作。可以从 Johnson 的研究入手,或者借助一些非常通用的方法,比如 ActiveMQ

一些特殊情况(比如单个读者和单个写者)可能会简单一些,且在它们有限的应用范围内可能会更快;但这时就需要为这个有限的范围制定一个非常具体的规范——结果将不再是一个通用的“多进程队列”,而是仅适用于特定需求的队列。

撰写回答