带有主动移除旧消息的字典+队列数据结构

6 投票
2 回答
11261 浏览
提问于 2025-04-16 17:00

我想创建一个数据结构,用来表示一组队列(最好是像哈希表、映射或字典那样的查找方式),这些队列里的消息会在达到一定时间后被主动移除。这个时间限制(ttl)是全局的;消息不需要也没有单独的时间限制。对于这个时间限制的精确度要求不高,差不多在一秒内就可以了。

我甚至不知道该怎么搜索相关内容。我可以创建一个单独的全局队列,然后让一个后台线程来监控这个队列,查看并提取指向消息的指针,这样就可以告诉它从各个队列中移除项目,但这个过程需要双向进行。如果某个项目从某个队列中被移除,它也需要从全局队列中移除。

我希望这个数据结构能用Python实现,速度是最重要的(比起内存使用更重要)。有没有什么建议可以让我开始呢?

2 个回答

1

在你访问队列的时候,可以考虑检查一下TTL(生存时间),而不是用一个线程一直在那儿检查。我不太明白你说的hash/map/dict(关键是什么?),不过可以试试下面这样的做法:

import time
class EmptyException(Exception): pass
class TTLQueue(object):
    TTL = 60 # seconds
    def __init__(self):
        self._queue = []

    def push(self, msg):
        self._queue.append((time.time()+self.TTL, msg))

    def pop(self):
        self._queue = [(t, msg) for (t, msg) in self._queue if t > time.time()]
        if len(self._queue) == 0:
            raise EmptyException()
        return self._queue.pop(0)[1]

queues = [TTLQueue(), TTLQueue(), TTLQueue()]  # this could be a dict or set or
                                               #    whatever if I knew what keys
                                               #    you expected
4

我建议你先在一个类里简单地模拟一下你想要的行为,尽量让它简单明了。性能方面可以稍后再优化,只有在必要的时候才去做(你可能根本不需要优化)。

下面这个类大致上实现了你所描述的功能。队列其实就是一些有名字的列表,存储在一个字典里。每条消息都有时间戳,并且会被插入到列表的最前面(先进先出)。通过检查列表最后一条消息的时间戳,可以逐个取出消息,直到遇到一条超过设定时间的消息为止。

如果你打算从多个线程访问这个类,你需要添加一些细致的锁定机制,以便最大限度地提高性能。例如,reap() 方法应该一次只锁定一个队列,而不是锁定所有队列(方法级别的同步),所以你还需要为每个命名的队列保持一个锁。

更新 -- 现在使用一个全局的时间桶(按时间戳,精确到1秒)来跟踪哪些队列在那个时间点有消息。这减少了每次检查时需要查看的队列数量。

import time
from collections import defaultdict

class QueueMap(object):

    def __init__(self):
        self._expire = defaultdict(lambda *n: defaultdict(int))
        self._store = defaultdict(list)
        self._oldest_key = int(time.time())

    def get_queue(self, name):
        return self._store.get(name, [])

    def pop(self, name):
        queue = self.get_queue(name)
        if queue:
            key, msg = queue.pop()
            self._expire[key][name] -= 1
            return msg
        return None

    def set(self, name, message):
        key = int(time.time())
        # increment count of messages in this bucket/queue
        self._expire[key][name] += 1
        self._store[name].insert(0, (key, message))

    def reap(self, age):
        now = time.time()
        threshold = int(now - age)
        oldest = self._oldest_key

        # iterate over buckets we need to check
        for key in range(oldest, threshold + 1):
            # for each queue with items, expire the oldest ones
            for name, count in self._expire[key].iteritems():
                if count <= 0:
                    continue

                queue = self.get_queue(name)
                while queue:
                    if queue[-1][0] > threshold:
                        break
                    queue.pop()
            del self._expire[key]

        # set oldest_key for next pass
        self._oldest_key = threshold

用法:

qm = QueueMap()
qm.set('one', 'message 1')
qm.set('one', 'message 2')
qm.set('two', 'message 3')
print qm.pop('one')
print qm.get_queue('one')
print qm.get_queue('two')

# call this on a background thread which sleeps
time.sleep(2)
# reap messages older than 1 second
qm.reap(1)
# queues should be empty now
print qm.get_queue('one')
print qm.get_queue('two')

撰写回答