Flask:后台线程将非空队列视为空队列

11 投票
2 回答
2781 浏览
提问于 2025-04-18 08:58

当我在uwsgi中运行Flask应用时,后台线程和应用函数在查询同一个队列的大小时看到的值不一样。

组成部分

  • 一个带有线程安全队列的Flask应用。
  • 一个GET请求返回队列的大小。
  • 一个POST请求向队列中添加一个元素。
  • 一个后台线程打印队列的大小。

问题

当我通过命令行用python tester.py运行应用时,得到的结果是我预期的:

2014-06-07 14:20:50.677995 Queue size is: 0
127.0.0.1 - - [07/Jun/2014 14:20:51] "POST /addMessage/X HTTP/1.1" 200 -
2014-06-07 14:20:51.679277 Queue size is: 1
2014-06-07 14:20:52.680425 Queue size is: 1
2014-06-07 14:20:53.681566 Queue size is: 1
2014-06-07 14:20:54.682708 Queue size is: 1
127.0.0.1 - - [07/Jun/2014 14:20:55] "POST /addMessage/Y HTTP/1.1" 200 -
2014-06-07 14:20:55.687755 Queue size is: 2
2014-06-07 14:20:56.688867 Queue size is: 2

但是,当我使用uwsgi执行应用时,日志中显示的是:

2014-06-07 14:17:42.056863 Queue size is: 0
2014-06-07 14:17:43.057952 Queue size is: 0
[pid: 9879|app: 0|req: 6/6] 127.0.0.1 () {24 vars in 280 bytes} [Sat Jun  7 14:17:43 2014] POST /addMessage/X => generated 16 bytes in 0 msecs (HTTP/1.1 200) 2 headers in 71 bytes (1 switches on core 0)
2014-06-07 14:17:44.059037 Queue size is: 0
2014-06-07 14:17:45.060118 Queue size is: 0
[pid: 9879|app: 0|req: 7/7] 127.0.0.1 () {24 vars in 280 bytes} [Sat Jun  7 14:17:45 2014] POST /addMessage/X => generated 16 bytes in 0 msecs (HTTP/1.1 200) 2 headers in 71 bytes (1 switches on core 0)
2014-06-07 14:17:46.061205 Queue size is: 0
2014-06-07 14:17:47.062286 Queue size is: 0

在uwsgi下运行时,后台线程看不到和应用相同的队列。这是为什么呢?我该如何让这两个线程查看同一个队列对象呢?

更新

  • 即使作为Python脚本执行时,我也发现行为不一致:有时它无法记录消息(使用app.logger),我只能看到print的输出。这意味着线程在运行,但它无法使用app.logger做任何事情。

uwsgi .ini 配置

[uwsgi]
http-socket    = :9002
plugin         = python
wsgi-file      = /home/ubuntu/threadtest-uwsgi.py
enable-threads = true
workers        = 1
chdir          = /home/ubuntu/thread-tester/thread_tester

代码

from flask import Flask, jsonify
import Queue
from threading import Thread
import time
import datetime
import logging
import sys

logging.basicConfig(stream=sys.stderr,
                    format='%(asctime)s %(levelname)s - %(message)s')

app = Flask(__name__)
messages = Queue.Queue()

def print_queue_size():
    while True:
        app.logger.debug("%s Queue size is: %d" % (datetime.datetime.now(),
                                        messages.qsize()))
        time.sleep(1)

t = Thread(target=print_queue_size, args=())
t.setDaemon(True)
t.start()

@app.route("/queueSize", methods=["GET"])
def get_queue_size():
    return jsonify({"qsize": messages.qsize()}), 200

@app.route("/addMessage/<message>", methods=["POST"])
def add_message_to_queue(message):
    messages.put(message)
    return jsonify({"qsize": messages.qsize()}), 200

if __name__ == "__main__":
    app.run(port=6000)

2 个回答

1

在@Martijn Pieters的回答中提到的文档链接里说,lazy-apps可能会比预先分叉(preforking)消耗更多的内存。如果你对此有顾虑,可以考虑使用@postfork这个装饰器,这样你就可以更细致地控制在分叉后运行哪些内容。你可以在一个用@postfork装饰的函数里创建你的队列,这样每个工作进程里都会创建一个队列。

12

来自需要知道的文档页面

uWSGI会尽量利用fork()这个调用的“写时复制”特性。默认情况下,它会在加载你的应用程序后进行分叉,这样可以尽量共享内存。如果你不想要这种行为,可以使用lazy选项。这会告诉uWSGI在每个工作进程分叉后再加载应用程序。懒加载模式会改变优雅重载的方式:不是重载整个实例,而是一个一个地重载每个工作进程。如果你想要“懒加载应用”,但又想保持标准的uWSGI重载行为,从1.3版本开始,你可以使用lazy-apps选项。

当uWSGI启动时,你的Flask应用也会启动,然后会有一个工作进程被分叉。在分叉时,Queue对象是空的,并且不再与原来的进程共享。线程也不会被带过去。

试着设置lazy-apps选项,这样可以延迟加载Flask应用,直到工作进程启动时再加载。

撰写回答