如何实现支持命名空间的FIFO队列

2 投票
2 回答
978 浏览
提问于 2025-04-16 05:42

我正在使用一种方法来处理一个基于Google App Engine的FIFO队列,这种队列是通过db.Model实现的(可以查看这个问题)。

from google.appengine.ext import db
from google.appengine.ext import webapp
from google.appengine.ext.webapp import run_wsgi_app

class QueueItem(db.Model):
  created = db.DateTimeProperty(required=True, auto_now_add=True)
  data = db.BlobProperty(required=True)

  @staticmethod
  def push(data):
    """Add a new queue item."""
    return QueueItem(data=data).put()

  @staticmethod
  def pop():
    """Pop the oldest item off the queue."""
    def _tx_pop(candidate_key):
      # Try and grab the candidate key for ourselves. This will fail if
      # another task beat us to it.
      task = QueueItem.get(candidate_key)
      if task:
        task.delete()
      return task
    # Grab some tasks and try getting them until we find one that hasn't been
    # taken by someone else ahead of us
    while True:
      candidate_keys = QueueItem.all(keys_only=True).order('created').fetch(10)
      if not candidate_keys:
        # No tasks in queue
        return None
      for candidate_key in candidate_keys:
        task = db.run_in_transaction(_tx_pop, candidate_key)
        if task:
          return task

这个队列运行得很好(非常好)。

现在我的代码中有一个方法,可以通过一个延迟队列来访问这个FIFO队列:

def deferred_worker():
        data= QueueItem.pop()
        do_something_with(data)

我想增强这个方法和队列的数据结构,添加一个client_ID参数,代表需要访问自己队列的特定客户。大概是这样的:

def deferred_worker(client_ID):
        data= QueueItem_of_this_client_ID.pop() # I need to implement this
        do_something_with(data)

我该如何让这个队列能够识别client_ID呢?

限制条件:
- 客户的数量是动态的,并不是预先定义好的
- 任务队列不是一个选项(1. 最多十个队列 2. 我想完全控制我的队列)

你知道我如何使用新的命名空间API来添加这个功能吗?(记住,我不是从webapp.RequestHandler调用db.Model)
另一个选项是:我可以在QueueItem中添加一个client_ID db.StringProperty,并在拉取方法中使用它作为过滤器:

QueueItem.all(keys_only=True).filter(client_ID=an_ID).order('created').fetch(10)

有没有更好的主意?

2 个回答

1

正如我在你问我问题时提到的,你不需要做任何额外的事情来让这个和命名空间一起工作:这个队列是建立在数据存储上的,而数据存储本身已经支持命名空间了。你只需要按照说明设置你想要的命名空间就可以了,具体可以参考文档

1

假设你的“客户端类”实际上是一个客户端调用的请求处理器,你可以这样做:

from google.appengine.api import users
from google.appengine.api.namespace_manager import set_namespace

class ClientClass(webapp.RequestHandler):
  def get(self):
    # For this example let's assume the user_id is your unique id.
    # You could just as easily use a parameter you are passed.
    user = users.get_current_user()
    if user:
       # If there is a user, use their queue.  Otherwise the global queue.
       set_namespace(user.user_id())

    item = QueueItem.pop()
    self.response.out.write(str(item))

    QueueItem.push('The next task.')

另外,你也可以设置一个全局的命名空间,具体可以参考这个链接:app-wide

通过设置默认的命名空间,所有对数据存储的调用都会在这个命名空间内进行,除非你特别指定其他的命名空间。需要注意的是,要获取和运行任务,你必须知道命名空间是什么。所以,你可能需要在默认命名空间中维护一个命名空间的列表,以便于清理。

撰写回答