从Flask返回HTTP状态码而不返回

2024-04-28 06:23:26 发布

您现在位置:Python中文网/ 问答频道 /正文

上下文

  • 我有一个名为"server.py"的服务器,它作为GitLab的提交后webhook运行。
  • "server.py"内,有一个长时间运行的进程(~40秒)

SSCCE公司

#!/usr/bin/env python

import time
from flask import Flask, abort, jsonify

debug = True

app = Flask(__name__)


@app.route("/", methods=['POST'])
def compile_metadata():
    # the long running process...
    time.sleep(40)
    # end the long running process
    return jsonify({"success": True})

if __name__ == "__main__":
    app.run(host='0.0.0.0', port=8082, debug=debug, threaded=True)

问题陈述

GitLab的webhooks希望返回的代码能够很快返回。因为我的webhook在40秒后或大约40秒后返回;GitLab发送一个重试消息,以循环方式发送我的长时间运行的进程,直到GitLab尝试了太多次。

问题

我是否能够将状态代码从Flask返回到GitLab,但仍然运行我的长时间运行的进程?

我试着加上一些东西,比如:

...
def compile_metadata():
    abort(200)
    # the long running process
    time.sleep(40)

但是abort()只支持失败代码。

我也试过使用@after_this_request

@app.route("/", methods=['POST'])
def webhook():
    @after_this_request
    def compile_metadata(response):
        # the long running process...
        print("Starting long running process...")
        time.sleep(40)
        print("Process ended!")
        # end the long running process
    return jsonify({"success": True})

通常,flask只从python的return语句返回一个状态代码,但是在长时间运行的进程之前,我显然不能使用它,因为它将从函数中转义。

注意:我实际上没有在代码中使用time.sleep(40)。这只为子孙后代和圣母玛利亚圣母玛利亚圣母玛利亚圣母玛利亚圣母玛利亚圣母玛利亚圣母玛利亚圣母玛利亚圣母玛利亚圣母玛利亚圣母玛利亚圣母玛利亚圣母玛利亚圣母玛利亚圣母玛利亚圣母玛利亚圣母玛利亚圣母玛利亚圣母玛利亚。它将返回相同的结果


Tags: the代码trueappflasktime进程def
3条回答

compile_metadata生成一个线程来处理长时间运行的任务,然后立即返回结果代码(即,不等待线程完成)。请确保对可以生成的同时线程的数量有一些限制。

对于稍微更健壮和可伸缩的解决方案,请考虑一些基于排序消息队列的解决方案,如celery

据记录,一个简单的解决方案可能是:

import time
import threading
from flask import Flask, abort, jsonify

debug = True

app = Flask(__name__)

def long_running_task():
    print 'start'
    time.sleep(40)
    print 'finished'

@app.route("/", methods=['POST'])
def compile_metadata():
    # the long running process...
    t = threading.Thread(target=long_running_task)
    t.start()
    # end the long running process
    return jsonify({"success": True})

if __name__ == "__main__":
    app.run(host='0.0.0.0', port=8082, debug=debug, threaded=True)

当您想快速地从服务器返回响应,并且仍然需要做一些耗时的工作时,通常应该使用某种共享存储(如Redis)来快速存储您需要的所有内容,然后返回状态代码。所以请求很快就得到了满足。

并有一个单独的服务器例行工作,该语义作业队列做耗时的工作。完成工作后,将作业从队列中移除。也许最终的结果也会存储在共享存储中。这是正常的方法,而且它的规模非常大。例如,如果作业队列增长太快,单个服务器无法跟上,则可以添加更多服务器来处理该共享队列。

但即使您不需要可伸缩性,它也是一个非常简单的设计,便于理解、实现和调试。如果您在请求负载中遇到意外的峰值,这就意味着您的独立服务器可能整晚都在运行。而且您可以放心,如果您的服务器关闭,您不会丢失任何未完成的工作,因为它们在共享存储中是安全的。

但是如果你让一个服务器做所有的事情,在后台异步执行长时间运行的任务,我想可能只是确保后台工作是这样进行的:

------------ Serving Responses
    ----     Background Work

不是这样的:

----    ---- Serving Responses
    ----     Background Work

否则,如果服务器在后台执行某个工作块,则可能对新请求没有响应,这取决于耗时的工作需要多长时间(即使在请求负载很小的情况下)。但如果客户超时并重试,我认为您仍然可以安全地执行双重工作。但你不可能不失去未完成的工作。

我可以通过使用multiprocessing.dummy.Pool来实现这一点。在使用threading.Thread之后,它被证明是无用的,因为烧瓶仍然会等待线程完成(即使使用t.daemon = True

我在长时间运行的任务之前返回了一个状态代码,结果如下:

#!/usr/bin/env python

import time
from flask import Flask, jsonify, request
from multiprocessing.dummy import Pool

debug = True

app = Flask(__name__)
pool = Pool(10)


def compile_metadata(data):
    print("Starting long running process...")
    print(data['user']['email'])
    time.sleep(5)
    print("Process ended!")


@app.route('/', methods=['POST'])
def webhook():
    data = request.json
    pool.apply_async(compile_metadata, [data])
    return jsonify({"success": True}), 202


if __name__ == "__main__":
    app.run(host='0.0.0.0', port=8082, debug=debug, threaded=True)

相关问题 更多 >