如何在Flask路由中使用线程
我有一个用Python写的Flask应用。想在处理某个特定请求时使用并发,但又不想在每次请求时都创建额外的线程。
这个请求的路由是这样定义的:
def sentence_numfound(path):
nf = util.NumFound(path)
return json.dumps(nf.results(path))
nf.results()需要发出多个HTTP请求才能返回结果,我希望这些请求能够并行进行。目前我这样做:
class NumFound:
def __init__(self, path):
queries = get_queries(path) # A list
self.__results = [{}] * len(queries)
self.queue = Queue.Queue()
for i, q in enumerate(queries):
self.queue.put((i, q))
def results(self):
num_workers = 31
for i in range(num_workers):
t = threading.Thread(target=self.worker)
t.daemon = True
t.start()
self.queue.join()
return self.__results
def worker(self):
while True:
i, q = self.queue.get()
self.__results[i] = foo(q)
self.queue.task_done()
问题是每次请求都会创建新的线程,而且没有办法关闭这些线程。最终,这个路由会因为Python无法再创建更多线程而出错。
有没有简单的方法可以重用这些线程?或者有没有其他方法可以实现并发?
1 个回答
4
我觉得你可以通过使用一个叫做 multiprocessing.Pool 的工具来接近你的实现。
你可以这样设置一个工作池:
from multiprocessing import Pool
pool = Pool(processes=31)
然后,你的路由只需要把任务提交给这个池子,并等待所有任务完成就行了。我无法测试这个,因为你没有提供足够的代码,但它可能看起来大致是这样的:
def sentence_numfound(path):
return jsonify(pool.map(foo, get_queries(path)))
这个基本上是在池子里的每个进程中并行调用 foo(query)
,对每个查询都执行一次。map()
这个调用会在所有任务完成后返回。返回的结果是一个数组,里面的结果顺序和输入的数组是一样的。
希望这对你有帮助!