如何在Python GAE应用中实现远程FIFO队列?

1 投票
2 回答
728 浏览
提问于 2025-04-15 14:10

如何在Python GAE应用中实现一个简单的远程FIFO队列,并将名称-值对字典推送到队列中或从队列中拉取出来?

举个例子,当对GAE应用进行HTTP GET请求时,GAE应用会返回队列中最早的那些名称-值对,这些对之前没有被拉取过。这些名称-值对会在客户端重新变成一个字典。urllib.urlencode提供了一种简单的方法来将字典编码为参数,但当你通过HTTP“获取”这些参数时,有什么简单的方法可以将参数解码成字典呢?如果队列里没有任何项目,GAE应用应该返回一个空值或者其他更合适的标识,让客户端能够做出响应。

#A local python script
import urllib 
targetURL="http://myapp.appspot.com/queue"

#Push to dictionary to GAE queue
params = urllib.urlencode({'spam': 1, 'eggs': 2, 'bacon': 0})
f = urllib.urlopen(targetURL, params)
print f.read()
params = urllib.urlencode({'foo': 1, 'bar': 2})
f = urllib.urlopen(targetURL, params)
print f.read()


#Pull oldest set of name-value pairs from the GAE queue and create a local dictionary from them.
#f = urllib.urlopen(targetURL, ……)
#returnedDictionary = ????

实现这个简单GAE应用的最简单方法是什么?

#queue.py a url handler in a GAE application.  
# For posts, create an object from the posted name-value pairs and insert it into the queue as the newest item in the queue
# For gets, return the name-value pairs for the oldest object in the queue and remove the object from the queue.
#   If there are no items in the queue, return null

2 个回答

0

下面的内容是基于你正在使用一个网页应用框架的假设。

简单来说,你只需要使用 self.request.GET,这个东西叫做 MultiDict(在很多情况下你可以把它当作字典来用),里面包含了发送到请求的表单数据。

需要注意的是,HTTP 允许表单数据中同一个键出现多次;如果你想要的不是字典,而是发送到你应用程序的一系列键值对列表,你可以通过 self.request.GET.items() 来获取这样的列表(详细信息可以查看 http://pythonpaste.org/webob/reference.html#query-post-variables),然后把这些键值对添加到你的队列中。

3

大概是这样的:

from google.appengine.ext import db
from google.appengine.ext import webapp
from google.appengine.ext.webapp import run_wsgi_app

class QueueItem(db.Model):
  created = db.DateTimeProperty(required=True, auto_now_add=True)
  data = db.BlobProperty(required=True)

  @staticmethod
  def push(data):
    """Add a new queue item."""
    return QueueItem(data=data).put()

  @staticmethod
  def pop():
    """Pop the oldest item off the queue."""
    def _tx_pop(candidate_key):
      # Try and grab the candidate key for ourselves. This will fail if
      # another task beat us to it.
      task = QueueItem.get(candidate_key)
      if task:
        task.delete()
      return task
    # Grab some tasks and try getting them until we find one that hasn't been
    # taken by someone else ahead of us
    while True:
      candidate_keys = QueueItem.all(keys_only=True).order('created').fetch(10)
      if not candidate_keys:
        # No tasks in queue
        return None
      for candidate_key in candidate_keys:
        task = db.run_in_transaction(_tx_pop, candidate_key)
        if task:
          return task

class QueueHandler(webapp.RequestHandler):
  def get(self):
    """Pop a request off the queue and return it."""
    self.response.headers['Content-Type'] = 'application/x-www-form-urlencoded'
    task = QueueItem.pop()
    if not task:
      self.error(404)
    else:
      self.response.out.write(task.data)

  def post(self):
    """Add a request to the queue."""
    QueueItem.push(self.request.body)

有一点需要注意:因为队列的顺序是根据时间戳来决定的,所以如果在不同的机器上有任务几乎同时到达,它们可能会被放入队列的顺序搞乱。因为没有一个统一的时间标准(只有通过NFS同步的服务器)。不过,这可能并不是个大问题,具体还得看你的使用场景。

撰写回答