如何在flask应用程序启动后自动命中端点?

2024-04-26 02:56:41 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在开发一个flask应用程序,它不断地检查来自AWS SQS队列的新消息并接收它们。为此,我编写了一个可以由端点触发的函数。-/start。为简单起见,在代码中,我没有发布从SQS队列接收消息的所有逻辑。只需打印datetime并等待3秒钟。你知道吗

当flask服务器启动并准备好为请求提供服务时,如何使这个端点被命中?你知道吗

有没有什么装饰师或什么东西使这成为可能?你知道吗

曲克_服务.py:

from datetime import datetime
import time
def receive_messages():
    print(datetime.now())
    time.sleep(3)

\uu初始化\uuuuuuuuuuuuuuuuuuuuuuy:

from flask import Flask, request, jsonify
from app.workload.services.queue_services import receive_messages

def create_app(**kwargs):
    app = Flask(__name__, **kwargs)

    @app.route('/start')
    def queue_receiver():
        while True:
            receive_messages()

    return app

wsgi.py公司:

from app import create_app

application = create_app()

if __name__ == "__main__":
    application.run()

Tags: frompyimportapp消息flaskdatetime队列
1条回答
网友
1楼 · 发布于 2024-04-26 02:56:41

您可以使用flask测试客户机在创建应用程序后向其发送请求。我已经将您的代码简化为一个独立的脚本,下面演示了这一点。你知道吗

from flask import Flask

def create_app(**kwargs):
    app = Flask(__name__, **kwargs)

    @app.route('/start')
    def queue_receiver():
        print("***Endpoint hit***")
        return ""

    return app

application = create_app()

if __name__ == "__main__":
    application.test_client().get('/start')
    print("***Starting server***")
    application.run()

如果这对您的代码不起作用,那么我怀疑这是因为您从未从/start端点返回。我假设在处理所有消息时您正在脱离while True循环,但为了简洁起见,没有包含代码。如果不是这样的话,那么可能有必要将应用程序拆分为一个单独的客户机和服务器,正如前面在注释中所建议的那样。你知道吗

相关问题 更多 >