Python脚本中的多进程/多线程死锁与日志记录

7 投票
3 回答
8143 浏览
提问于 2025-04-18 11:43

我在使用下面这个脚本收集日志时遇到了问题。当我把 SLEEP_TIME 设置得太“短”时,LoggingThread 线程似乎会阻塞日志模块。这个脚本在 action 函数的日志请求时就卡住了。如果 SLEEP_TIME 设置为大约 0.1,脚本就能正常收集到我想要的所有日志信息。

我试着按照 这个回答 去做,但并没有解决我的问题。

import multiprocessing
import threading
import logging
import time

SLEEP_TIME = 0.000001

logger = logging.getLogger()

ch = logging.StreamHandler()
ch.setFormatter(logging.Formatter('%(asctime)s %(levelname)s %(funcName)s(): %(message)s'))
ch.setLevel(logging.DEBUG)

logger.setLevel(logging.DEBUG)
logger.addHandler(ch)


class LoggingThread(threading.Thread):

    def __init__(self):
        threading.Thread.__init__(self)

    def run(self):
        while True:
            logger.debug('LoggingThread: {}'.format(self))
            time.sleep(SLEEP_TIME)


def action(i):
    logger.debug('action: {}'.format(i))


def do_parallel_job():

    processes = multiprocessing.cpu_count()
    pool = multiprocessing.Pool(processes=processes)
    for i in range(20):
        pool.apply_async(action, args=(i,))
    pool.close()
    pool.join()



if __name__ == '__main__':

    logger.debug('START')

    #
    # multithread part
    #
    for _ in range(10):
        lt = LoggingThread()
        lt.setDaemon(True)
        lt.start()

    #
    # multiprocess part
    #
    do_parallel_job()

    logger.debug('FINISH')

如何在多进程和多线程的脚本中使用日志模块?

3 个回答

0

默认情况下,在Linux上使用multiprocessing时,它会通过fork()来复制进程。这意味着新创建的子进程会丢失所有正在运行的线程,只有主线程会保留。所以如果你在Linux上,这就是问题所在。

第一件要做的事是:你永远不应该使用基于fork()的进程池;可以参考这两个链接了解更多信息:https://pythonspeed.com/articles/python-multiprocessing/https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods

在Windows上,以及我认为在较新版本的macOS上,使用的是基于"spawn"的进程池。这也是你在Linux上应该使用的方式。在这种设置下,会启动一个新的Python进程。正如你所想的,新进程不会继承父进程的任何线程,因为它是一个全新的进程。

第二件要做的事是:你需要在进程池中的每个子进程中设置日志记录;父进程的日志设置不足以获取工作进程的日志。你可以通过Poolinitializer关键字参数来实现,比如写一个叫setup_logging()的函数,然后这样做:pool = multiprocessing.Pool(initializer=setup_logging)https://docs.python.org/3/library/multiprocessing.html#module-multiprocessing.pool)。

0

我最近在使用日志模块和Pathos多进程库时遇到了类似的问题。虽然我还不完全确定,但看起来在我的情况下,问题可能是因为日志处理器试图在不同的进程中重复使用一个锁对象。

我通过在默认的日志处理器周围加了一个简单的包装器来解决这个问题:

import threading
from collections import defaultdict
from multiprocessing import current_process

import colorlog


class ProcessSafeHandler(colorlog.StreamHandler):
    def __init__(self):
        super().__init__()

        self._locks = defaultdict(lambda: threading.RLock())

    def acquire(self):
        current_process_id = current_process().pid
        self._locks[current_process_id].acquire()

    def release(self):
        current_process_id = current_process().pid
        self._locks[current_process_id].release()
8

这可能是一个叫做 bug 6721 的问题。

这个问题在有锁、线程和分叉的情况下很常见。如果线程1在持有一个锁的情况下,线程2调用了分叉,那么在分叉出来的进程中,只会有线程2,而那个锁就会一直被占用,永远无法释放。在你的例子中,就是 logging.StreamHandler.lock 这个锁。

你可以在 这里 找到一个解决方案 (永久链接),这是针对 logging 模块的修复。记得也要处理其他的锁哦。

撰写回答