Python多处理池某些进程在分叉时处于死锁状态,但在派生时运行

2024-04-25 05:30:38 发布

您现在位置:Python中文网/ 问答频道 /正文

因此,我尝试使用一些服务下载和调整图像大小(使用线程下载图像,并使用进程调整图像大小)。我启动下载线程(使用一个管理器线程来监视它们),在本地保存图像后,我将其路径添加到队列中。下载所有图像后,管理器线程将向队列中添加毒药丸

同时,主线程监视队列并在下载路径时从队列中获取路径,并从池中启动一个新的异步进程来调整映像的大小

最后,当我尝试加入池时,它有时会挂起,似乎是一个死锁。它并不是每次都发生,但IMG_url列表中的url越多,它发生的频率就越高。如果发生这种死锁,日志会告诉我们某些进程没有正确启动或立即处于死锁状态,因为没有为它们显示“调整大小{file}”日志

import logging
import multiprocessing as mp
import time
from queue import Queue
from threading import Thread


def resize_image(file):
    logging.info(f"resizing {file}")
    time.sleep(0.1)
    logging.info(f"done resizing {file}")


class Service(object):
    def __init__(self):
        self.img_queue = Queue()

    def download_image(self, url) -> None:
        logging.info(f"downloading image from URL {url}")
        time.sleep(1)
        file = f"local-{url}"
        self.img_queue.put(file)
        logging.info(f"image saved to {file}")

    def download_images(self, img_url_list: list):
        logging.info("beginning image downloads")

        threads = []
        for url in img_url_list:
            t = Thread(target=self.download_image, args=(url,))
            t.start()
            threads.append(t)

        for t in threads:
            t.join()
        logging.info("all images downloaded")
        self.img_queue.put(None)

    def resize_images(self):
        logging.info("beginning image resizing")
        with mp.Pool() as p:
            while True:
                file = self.img_queue.get()
                if file is None:
                    logging.info("got SENTINEL")
                    break
                logging.info(f"got {file}")
                p.apply_async(func=resize_image, args=(file,))
            p.close()
            p.join()
        logging.info("all images resized")

    def run(self, img_url_list):
        logging.info("START service")

        dl_manager_thread = Thread(target=self.download_images, args=(img_url_list,))
        dl_manager_thread.start()
        self.resize_images()

        logging.info(f"END service")


if __name__ == "__main__":
    FORMAT = "[%(threadName)s, %(asctime)s, %(levelname)s] %(message)s"
    logging.basicConfig(level=logging.DEBUG, format=FORMAT)

    IMG_URLS = list(range(8))

    service = Service()
    service.run(IMG_URLS)

在使用python 3.8.5(Ubuntu 20.04,Ryzen 2600)运行时。我得到以下信息:

[MainThread, 2020-11-30 19:58:01,257, INFO] START service
[Thread-1, 2020-11-30 19:58:01,257, INFO] beginning image downloads
[MainThread, 2020-11-30 19:58:01,257, INFO] beginning image resizing
[Thread-2, 2020-11-30 19:58:01,258, INFO] downloading image from URL 0
[Thread-3, 2020-11-30 19:58:01,258, INFO] downloading image from URL 1
[Thread-4, 2020-11-30 19:58:01,258, INFO] downloading image from URL 2
[Thread-5, 2020-11-30 19:58:01,259, INFO] downloading image from URL 3
[Thread-6, 2020-11-30 19:58:01,260, INFO] downloading image from URL 4
[Thread-7, 2020-11-30 19:58:01,260, INFO] downloading image from URL 5
[Thread-8, 2020-11-30 19:58:01,261, INFO] downloading image from URL 6
[Thread-9, 2020-11-30 19:58:01,262, INFO] downloading image from URL 7
[Thread-2, 2020-11-30 19:58:02,259, INFO] image saved to local-0
[MainThread, 2020-11-30 19:58:02,260, INFO] got local-0
[Thread-3, 2020-11-30 19:58:02,260, INFO] image saved to local-1
[Thread-4, 2020-11-30 19:58:02,260, INFO] image saved to local-2
[MainThread, 2020-11-30 19:58:02,261, INFO] got local-1
[MainThread, 2020-11-30 19:58:02,261, INFO] resizing local-0
[Thread-5, 2020-11-30 19:58:02,261, INFO] image saved to local-3
[Thread-6, 2020-11-30 19:58:02,261, INFO] image saved to local-4
[MainThread, 2020-11-30 19:58:02,261, INFO] got local-2
[MainThread, 2020-11-30 19:58:02,262, INFO] got local-3
[MainThread, 2020-11-30 19:58:02,262, INFO] resizing local-1
[Thread-7, 2020-11-30 19:58:02,262, INFO] image saved to local-5
[MainThread, 2020-11-30 19:58:02,262, INFO] got local-4
[MainThread, 2020-11-30 19:58:02,263, INFO] got local-5
[MainThread, 2020-11-30 19:58:02,263, INFO] resizing local-3
[Thread-8, 2020-11-30 19:58:02,263, INFO] image saved to local-6
[MainThread, 2020-11-30 19:58:02,263, INFO] resizing local-4
[MainThread, 2020-11-30 19:58:02,263, INFO] resizing local-5
[MainThread, 2020-11-30 19:58:02,263, INFO] got local-6
[MainThread, 2020-11-30 19:58:02,264, INFO] resizing local-6
[Thread-9, 2020-11-30 19:58:02,264, INFO] image saved to local-7
[MainThread, 2020-11-30 19:58:02,265, INFO] got local-7
[Thread-1, 2020-11-30 19:58:02,265, INFO] all images downloaded
[MainThread, 2020-11-30 19:58:02,265, INFO] got SENTINEL
[MainThread, 2020-11-30 19:58:02,265, INFO] resizing local-7
[MainThread, 2020-11-30 19:58:02,362, INFO] done resizing local-0
[MainThread, 2020-11-30 19:58:02,363, INFO] done resizing local-1
[MainThread, 2020-11-30 19:58:02,363, INFO] done resizing local-3
[MainThread, 2020-11-30 19:58:02,364, INFO] done resizing local-4
[MainThread, 2020-11-30 19:58:02,364, INFO] done resizing local-5
[MainThread, 2020-11-30 19:58:02,364, INFO] done resizing local-6
[MainThread, 2020-11-30 19:58:02,366, INFO] done resizing local-7

有时在这里它开始悬挂。请注意,缺少调整本地-2日志大小的,因此该进程没有启动或正在等待某些内容

如果我将池更改为使用产卵而不是分叉,则效果很好。我猜在某些情况下,fork会在获得状态下复制一些锁,这就是问题所在,但我不清楚在哪里以及为什么

with mp.get_context("spawn").Pool() as p:

有什么想法吗


Tags: fromimageselfinfourllocalloggingthread
2条回答

有时(当你运气不好的时候),当你的池正在旋转时,当你的下载线程正在向logging模块写入消息时,其中一个子进程将被“fork”阻塞。logging模块使用一个受锁保护的队列来传递消息,因此当发生“fork”时,可以在锁定状态下复制该锁。然后,当下载线程将其消息写入队列时,只有主进程上的锁被释放,因此剩下一个子进程等待该锁的副本将消息写入logging。该锁永远不会被释放,因为下载程序线程不会被复制(fork不会复制线程)。这就是发生的死锁。这种类型的错误可以通过某些方式进行修补,但这是“spawn”存在的原因之一

此外,“spawn”是所有体系结构都支持的唯一方法。在没有意识到的情况下使用一个恰好是多线程的库非常容易,而“fork”并不是真正的多线程友好型库。我对“forkserver”没有太多的了解,以防您真的需要“fork”提供的减少的开销。从理论上讲,多线程更安全

fork

The parent process uses os.fork() to fork the Python interpreter. The child process, when it begins, is effectively identical to the parent process. All resources of the parent are inherited by the child process. Note that safely forking a multithreaded process is problematic.

Here's关于这个问题的更深入的讨论和一些参考资料,我将其用作我的主要资源

只是一些额外的信息来扩展Aaron的伟大答案

这个python错误/增强似乎与此完全相同: https://bugs.python.org/issue6721

我在另一个问题中发现了同样的问题: Deadlock with logging multiprocess/multithread python script

相关问题 更多 >