多处理过程放置请求后不终止响应.内容去奎尤

2024-06-02 18:01:08 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在尝试与并行运行多个API请求多处理过程和请求。我将要解析的url放入JoinableQueue实例,并将内容放回Queue实例。我注意到响应.内容以某种方式阻止进程终止。在

下面是一个简化的示例,其中只有一个进程(Python3.5):

import multiprocessing as mp
import queue
import requests
import time


class ChildProcess(mp.Process):
    def __init__(self, q, qout):
        super().__init__()
        self.qin = qin
        self.qout = qout
        self.daemon = True

    def run(self):
        while True:
            try:
                url = self.qin.get(block=False)
                r = requests.get(url, verify=False)
                self.qout.put(r.content)
                self.qin.task_done()
            except queue.Empty:
                break
            except requests.exceptions.RequestException as e:
                print(self.name, e)
                self.qin.task_done()
        print("Infinite loop terminates")


if __name__ == '__main__':
    qin = mp.JoinableQueue()
    qout = mp.Queue()
    for _ in range(5):
        qin.put('http://en.wikipedia.org')
    w = ChildProcess(qin, qout)
    w.start()
    qin.join()
    time.sleep(1)
    print(w.name, w.is_alive())

运行代码后,我得到:

Infinite loop terminates

ChildProcess-1 True

请帮助理解运行函数退出后进程为什么不终止。在

Update:添加了print语句以显示循环终止


Tags: 实例nameimportselftrueurl内容进程
3条回答

Pipes and Queues documentation所述

if a child process has put items on a queue (and it has not used JoinableQueue.cancel_join_thread), then that process will not terminate until all buffered items have been flushed to the pipe.

This means that if you try joining that process you may get a deadlock unless you are sure that all items which have been put on the queue have been consumed.

...

Note that a queue created using a manager does not have this issue.

如果切换到管理器队列,则进程将成功终止:

import multiprocessing as mp
import queue
import requests
import time


class ChildProcess(mp.Process):
    def __init__(self, q, qout):
        super().__init__()
        self.qin = qin
        self.qout = qout
        self.daemon = True

    def run(self):
        while True:
            try:
                url = self.qin.get(block=False)
                r = requests.get(url, verify=False)
                self.qout.put(r.content)
                self.qin.task_done()
            except queue.Empty:
                break
            except requests.exceptions.RequestException as e:
                print(self.name, e)
                self.qin.task_done()
        print("Infinite loop terminates")


if __name__ == '__main__':
    manager = mp.Manager()
    qin = mp.JoinableQueue()
    qout = manager.Queue()
    for _ in range(5):
        qin.put('http://en.wikipedia.org')
    w = ChildProcess(qin, qout)
    w.start()
    qin.join()
    time.sleep(1)
    print(w.name, w.is_alive())

在打印消息上方添加对w.terminate()的调用。在


关于进程为什么不自动终止;您的函数代码是一个无限循环,因此它永远不会返回。调用terminate向进程发出终止信号。在

根据Queuedocumentation来理解这一点有点困难——我也曾遇到过同样的问题。在

这里的关键概念是,在一个生产者线程终止之前,它将它拥有put数据的任何队列加入;这个队列然后阻塞,直到队列的后台线程终止,这只在队列为空时发生。因此,基本上,在你的ChildProcess退出之前,必须有人将它put的所有内容消耗到队列中!在

有一些关于Queue.cancel_join_thread函数的文档,它应该可以避免这个问题,但是我不能让它产生任何效果-也许我没有正确地使用它。在

你可以在这里修改一个问题:

if __name__ == '__main__':
    qin = mp.JoinableQueue()
    qout = mp.Queue()
    for _ in range(5):
        qin.put('http://en.wikipedia.org')
    w = ChildProcess(qin, qout)
    w.start()
    qin.join()
    while True:
        try:
            qout.get(True, 0.1)     # Throw away remaining stuff in qout (or process it or whatever,
                                    # just get it out of the queue so the queue background process
                                    # can terminate, so your ChildProcess can terminate.
        except queue.Empty:
            break
    w.join()                # Wait for your ChildProcess to finish up.
    # time.sleep(1)         # Not necessary since we've joined the ChildProcess
    print(w.name, w.is_alive())

相关问题 更多 >