Flask:后台线程将非空队列视为空队列
当我在uwsgi中运行Flask应用时,后台线程和应用函数在查询同一个队列的大小时看到的值不一样。
组成部分
- 一个带有线程安全队列的Flask应用。
- 一个
GET
请求返回队列的大小。 - 一个
POST
请求向队列中添加一个元素。 - 一个后台线程打印队列的大小。
问题
当我通过命令行用python tester.py
运行应用时,得到的结果是我预期的:
2014-06-07 14:20:50.677995 Queue size is: 0
127.0.0.1 - - [07/Jun/2014 14:20:51] "POST /addMessage/X HTTP/1.1" 200 -
2014-06-07 14:20:51.679277 Queue size is: 1
2014-06-07 14:20:52.680425 Queue size is: 1
2014-06-07 14:20:53.681566 Queue size is: 1
2014-06-07 14:20:54.682708 Queue size is: 1
127.0.0.1 - - [07/Jun/2014 14:20:55] "POST /addMessage/Y HTTP/1.1" 200 -
2014-06-07 14:20:55.687755 Queue size is: 2
2014-06-07 14:20:56.688867 Queue size is: 2
但是,当我使用uwsgi
执行应用时,日志中显示的是:
2014-06-07 14:17:42.056863 Queue size is: 0
2014-06-07 14:17:43.057952 Queue size is: 0
[pid: 9879|app: 0|req: 6/6] 127.0.0.1 () {24 vars in 280 bytes} [Sat Jun 7 14:17:43 2014] POST /addMessage/X => generated 16 bytes in 0 msecs (HTTP/1.1 200) 2 headers in 71 bytes (1 switches on core 0)
2014-06-07 14:17:44.059037 Queue size is: 0
2014-06-07 14:17:45.060118 Queue size is: 0
[pid: 9879|app: 0|req: 7/7] 127.0.0.1 () {24 vars in 280 bytes} [Sat Jun 7 14:17:45 2014] POST /addMessage/X => generated 16 bytes in 0 msecs (HTTP/1.1 200) 2 headers in 71 bytes (1 switches on core 0)
2014-06-07 14:17:46.061205 Queue size is: 0
2014-06-07 14:17:47.062286 Queue size is: 0
在uwsgi下运行时,后台线程看不到和应用相同的队列。这是为什么呢?我该如何让这两个线程查看同一个队列对象呢?
更新
- 即使作为Python脚本执行时,我也发现行为不一致:有时它无法记录消息(使用
app.logger
),我只能看到print
的输出。这意味着线程在运行,但它无法使用app.logger
做任何事情。
uwsgi .ini
配置
[uwsgi]
http-socket = :9002
plugin = python
wsgi-file = /home/ubuntu/threadtest-uwsgi.py
enable-threads = true
workers = 1
chdir = /home/ubuntu/thread-tester/thread_tester
代码
from flask import Flask, jsonify
import Queue
from threading import Thread
import time
import datetime
import logging
import sys
logging.basicConfig(stream=sys.stderr,
format='%(asctime)s %(levelname)s - %(message)s')
app = Flask(__name__)
messages = Queue.Queue()
def print_queue_size():
while True:
app.logger.debug("%s Queue size is: %d" % (datetime.datetime.now(),
messages.qsize()))
time.sleep(1)
t = Thread(target=print_queue_size, args=())
t.setDaemon(True)
t.start()
@app.route("/queueSize", methods=["GET"])
def get_queue_size():
return jsonify({"qsize": messages.qsize()}), 200
@app.route("/addMessage/<message>", methods=["POST"])
def add_message_to_queue(message):
messages.put(message)
return jsonify({"qsize": messages.qsize()}), 200
if __name__ == "__main__":
app.run(port=6000)
2 个回答
1
在@Martijn Pieters的回答中提到的文档链接里说,lazy-apps
可能会比预先分叉(preforking)消耗更多的内存。如果你对此有顾虑,可以考虑使用@postfork
这个装饰器,这样你就可以更细致地控制在分叉后运行哪些内容。你可以在一个用@postfork
装饰的函数里创建你的队列,这样每个工作进程里都会创建一个队列。
12
来自需要知道的文档页面:
uWSGI会尽量利用
fork()
这个调用的“写时复制”特性。默认情况下,它会在加载你的应用程序后进行分叉,这样可以尽量共享内存。如果你不想要这种行为,可以使用lazy
选项。这会告诉uWSGI在每个工作进程分叉后再加载应用程序。懒加载模式会改变优雅重载的方式:不是重载整个实例,而是一个一个地重载每个工作进程。如果你想要“懒加载应用”,但又想保持标准的uWSGI重载行为,从1.3版本开始,你可以使用lazy-apps
选项。
当uWSGI启动时,你的Flask应用也会启动,然后会有一个工作进程被分叉。在分叉时,Queue
对象是空的,并且不再与原来的进程共享。线程也不会被带过去。
试着设置lazy-apps
选项,这样可以延迟加载Flask应用,直到工作进程启动时再加载。