如何让后端真正执行GAE Python的任何操作

2024-04-29 14:05:59 发布

您现在位置:Python中文网/ 问答频道 /正文

我试图在googleappengine中得到一个后端来运行来自taskqueue的任务。它运行时没有错误,但任务没有执行。我发现文件非常混乱。在

我的克罗恩·亚姆公司名称:

- description: backend test
  url: /send_to_backend
  schedule: every 2 minutes

我的应用程序yaml公司名称:

^{pr2}$

我的后端.yaml公司名称:

- name: backendtest
  class: B1

我的队列.yaml公司名称:

total_storage_limit: 500M
queue:
    - name: test
      rate: 1/s
      max_concurrent_requests: 1

我的手下主.py公司名称:

class BackendHandler(webapp2.RequestHandler):
    def get(self):
    taskqueue.add(url='/test', target='backendtest')

class TestHandler(webapp2.RequestHandler):
    def get(self):
        test.test()

在中执行实际工作的函数测试.py公司名称:

def test():
    company = Company()
    company.name = "Advanced Micro Devices, Inc"
    company.exchange = "NASDAQ"
    company.put()

AMD从未进入数据库,我很茫然。我这样做对吗?后端和任务队列是这样一起工作的吗?在


Tags: namepytest名称backendurlyaml队列
1条回答
网友
1楼 · 发布于 2024-04-29 14:05:59

是的,后端和任务队列是这样一起工作的。在

不幸的是,您没有发布完全可运行的示例,所以很难说您需要多少修复。您肯定需要的一个修复方法是在TestHandler中更改get->;post(队列任务处理是通过post完成的)

下面是示例的完全可运行和工作版本。别忘了“The development server doesn't automatically run your cron jobs”,所以在开发环境中用curl试试:

在应用程序yaml在

application: stackoverflow-21225722
version: 1
runtime: python27
api_version: 1
threadsafe: true

handlers:

- url: /.*
  script: main.app
  login: admin

在后端.yaml在

^{pr2}$

在克罗恩·亚姆在

- description: backend test
  url: /send_to_backend
  schedule: every 2 minutes

在主.py在

from google.appengine.api import taskqueue
import test
import webapp2

class BackendHandler(webapp2.RequestHandler):
    def get(self):
      taskqueue.add(url='/test', target='backendtest')

class TestHandler(webapp2.RequestHandler):
    def post(self):
       test.test()

app = webapp2.WSGIApplication([
    ('/send_to_backend', BackendHandler),
    ('/test', TestHandler)
], debug=True)

在队列.yaml在

total_storage_limit: 500M
queue:
    - name: test
      rate: 1/s
      max_concurrent_requests: 1

在测试.py在

from google.appengine.ext import ndb

class Company(ndb.Model):
    name = ndb.StringProperty()
    exchange = ndb.StringProperty()

def test():
    company = Company()
    company.name = "Advanced Micro Devices, Inc"
    company.exchange = "NASDAQ"
    company.put()

相关问题 更多 >