如何在Flask路由中使用线程

2 投票
1 回答
3823 浏览
提问于 2025-04-18 03:17

我有一个用Python写的Flask应用。想在处理某个特定请求时使用并发,但又不想在每次请求时都创建额外的线程。

这个请求的路由是这样定义的:

def sentence_numfound(path):
    nf = util.NumFound(path)
    return json.dumps(nf.results(path))

nf.results()需要发出多个HTTP请求才能返回结果,我希望这些请求能够并行进行。目前我这样做:

class NumFound:
    def __init__(self, path):
        queries = get_queries(path) # A list
        self.__results = [{}] * len(queries)
        self.queue = Queue.Queue()
        for i, q in enumerate(queries):
            self.queue.put((i, q))

    def results(self):
        num_workers = 31
        for i in range(num_workers):
            t = threading.Thread(target=self.worker)
            t.daemon = True
            t.start()
        self.queue.join()
        return self.__results

    def worker(self):
        while True:
            i, q = self.queue.get()
            self.__results[i] = foo(q)
            self.queue.task_done()

问题是每次请求都会创建新的线程,而且没有办法关闭这些线程。最终,这个路由会因为Python无法再创建更多线程而出错。

有没有简单的方法可以重用这些线程?或者有没有其他方法可以实现并发?

1 个回答

4

我觉得你可以通过使用一个叫做 multiprocessing.Pool 的工具来接近你的实现。

你可以这样设置一个工作池:

from multiprocessing import Pool
pool = Pool(processes=31)

然后,你的路由只需要把任务提交给这个池子,并等待所有任务完成就行了。我无法测试这个,因为你没有提供足够的代码,但它可能看起来大致是这样的:

def sentence_numfound(path):
    return jsonify(pool.map(foo, get_queries(path)))

这个基本上是在池子里的每个进程中并行调用 foo(query),对每个查询都执行一次。map() 这个调用会在所有任务完成后返回。返回的结果是一个数组,里面的结果顺序和输入的数组是一样的。

希望这对你有帮助!

撰写回答