跟踪每日配额(自动过期递增)- Redis还是Pymongo?

1 投票
3 回答
896 浏览
提问于 2025-04-18 14:59

这个问题其实很简单:我们有一个记录,每当某件事情发生时(比如说文件被下载的次数),我们想要保持一个下载次数的总数。但是,我们希望每次增加的下载次数在24小时后失效(也就是说,总数不会重置为零,但超过24小时的下载次数会从总数中消失)。

(另外,如果有办法让Redis中的increment操作自动失效,请现在就告诉我。)

现在的情况是,有一个小应用程序已经在使用Redis来跟踪一些不断更新的值,同时使用MongoDBPymongo进行长期存储。我打算对某些功能实施每日配额,这些配额可以在RedisMongoDB中进行跟踪。

但是Redis有个问题,就是它的结构比较简单,所以为了跟踪每个下载的失效情况(而不是总数),我们需要创建单独的项目:

cache.set('filename1.downloads.action_234612', {'downloads': 1)
cache.expire('filename1.downloads.action_234612', 86400)

然后,为了判断下载次数是否达到了每日限制,我可以统计缓存中以'user1.votes.action_*'为标识的对象数量。

我认为第二种选择是把投票记录放到一个pymongo集合中,并加上时间戳,然后忽略那些time.time() - download_timestamp > 86400的项目(不使用SQL只是因为已经在用MongoDB)。

我知道可能有更快的方法来实现这个,但我不确定性能差异是否值得去纠结这个问题。(至少从概念上看,似乎可以在不对项目集进行索引的情况下,仅对栈顶的项目进行操作。)

需要注意的是,如果服务器出现故障,系统会在启动时进行完整性检查,并重新计算文件被下载的次数(或者任何正在跟踪的增量)。

3 个回答

0

这个版本主要使用了 pandas.Series.shift。它设定了一个一分钟的时间间隔,但其实可以适用于任何时间间隔。

它依赖于一个日志,这个日志的大小是整个时间窗口的大小。比如说,对于24小时的时间窗口,这样是可以的(24*60个值,也就是1440个)。但是如果时间窗口更大,就会占用不少内存(30天的话就是43200个值)……

每次记录生成时,日志都会被移动,以便忘记那些太久之前的记录。

from datetime import datetime
import numpy as np
import pandas as pd # must be at least version 0.24

class QuotaExceededError(Exception): pass

class TimeWindowQuota():
    """
    Check if a quota has not been exceeded during a past given time window.

    IMPORTANT: the time window resolution is in minute. So any two records 
    occuring within the same minute of the current date will be counted as 
    one record.
    """

    def __init__(self, time_window_minutes, limit):
        """
        Args:
            - time_window_minutes is a positive integer 
            - limit is a number above which to raise QuotaExceededError
        """
        self.limit = limit
        self.journal = pd.Series(np.zeros(time_window_minutes))
        self.head_date = None # will be initialize at 1st record

    def record(self, quantity=1, date=None):
        """
        Record a quantity for a given date. Quantities recorded outside the past
        time window will be forgotten.

        IMPORTANT: the time window resolution is in minute. So any two records 
        occuring within the same minute of the current date will be counted as 
        one record.

        Args:
           - quantity : a number to record (can be negative)
           - date (datetime): date associated with the recording.
             Cannot be a date earlier than the previous call of record.
             Default is datetime.now().
        """
        if date is None:
            date = datetime.now()

        if self.head_date is not None:
            assert(date >= self.head_date) # cannot record in the past
            dt = (date - self.head_date).total_seconds() / 60.0
            self.journal = self.journal.shift(int(dt), fill_value=0.0)

        self.journal.loc[0] += quantity
        if self.journal.sum() >= self.limit:
            raise QuotaExceededError()

        self.head_date = date

if __name__ == '__main__':

    tracker = TimeWindowQuota(time_window_minutes=24*60, limit=100)
    tracker.record(quantity=90, date=datetime(2020,4,24,20,20,0))
    tracker.record(quantity=50, date=datetime(2020,4,25,20,20,0))
    tracker.record(quantity=20, date=datetime(2020,4,26,10,50,0))
    tracker.record(quantity=29, date=datetime(2020,4,26,20,19,0))
    # This will raise a QuotaExceededError:
    tracker.record(date=datetime(2020,4,26,20,19,0))
1

因为我没有足够的声望,所以不能对Itamar的答案进行评论,尽管他的答案非常有效。

我最近做过类似的事情,所以我对之前的解决方案只有两个小改动。当我往有序集合中添加一个项目时,我(以及其他人)并不需要成员项,最开始我也是像Itamar那样,用一个唯一的计数器来实现。

过了一段时间,我把它改成了:

r.zadd('filename1:downloads', time.time(), time.time())

这样成员和时间戳就变成了相同的(唯一的)值。

因为检查某个操作是否超出配额总是在用户尝试发起操作时进行的,所以我确保修剪(zremrangebyscore)覆盖了我感兴趣的整个时间窗口(86400秒),并使用了zcard()而不是zcount()。

再用伪代码表示一下:

def try_download(r, sorted_set_key, timestamp=time.time(), limit=1000, window=24 * 60 * 60):
    # trim current set
    r.zremrangebyscore(sorted_set_key, '-inf', timestamp - window)
    # how many items are there in the set?
    count = r.zcard(sorted_set_key)
    # too many?
    if count >= limit:
        return False
    # add new download
    r.zadd(sorted_set_key, timestamp, timestamp)
    # expire after window seconds
    r.expire(sorted_set_key, window)
    # return True meaning download allowed
    return True

在zcard和zadd之间存在竞争条件,这可以通过WATCH/MULTI/EXEC或者LUA脚本来解决。

1

我不能告诉你用MongoDB还是Redis更好,但我可以告诉你如果用Redis我会怎么做。

对于每个计数器,比如下载的文件,保持一个有序集合。这个有序集合的成员应该代表下载的动作,而它们的分数就是这个动作发生的时间戳。当你更新这个有序集合时,要把旧的项目删掉,并为整个集合设置一个过期时间。

添加一个下载到计数器的工作流程大概是这样的,用伪Python表示(注意,redis-py把成员和分数的顺序调换了):

r.zadd('filename1:downloads', 'action_234612', time.time())
r.zremrangebyscore('filename1:downloads', '-inf', time.time()-86400)
r.expire('filename1:downloads', 86400)

你可能想用MULTI/EXEC块来处理上面的操作,或者也可以使用服务器端的Lua脚本。

现在,繁重的工作已经完成,获取一个文件当前的下载次数(比如说在过去24小时内)就很简单了,只需要对这个键使用ZCOUNT命令(你可能也想在这里进行修剪):

downloads = r.zcount('filename1:downloads', time.time()-86400, time.time())

撰写回答