Python 2.x 中多进程线程安全打印

2 投票
1 回答
2139 浏览
提问于 2025-04-17 00:27

我遇到了一个很奇怪的问题,实在无法解释,就是在多个进程(用subprocess模块启动)向文件打印时出现的。我的输出有些地方稍微被截断了,还有一些内容完全消失了。我使用了Alex Martelli的一个稍微修改过的解决方案,目的是确保多个进程写同一个文件时,输出不会混在一起。我在写的时候会先把内容缓存起来,只有在看到换行符时才真正写入文件。


import sys
import threading

tls = threading.local()

class ThreadSafeFile(object):
    """
    @author: Alex Martelli
    @see: https://stackoverflow.com/questions/3029816/how-do-i-get-a-thread-safe-print-in-python-2-6
    @summary: Allows for safe printing of output of multi-threaded programs to stdout.
    """
    def __init__(self, f):
        self.f = f
        self.lock = threading.RLock()
        self.nesting = 0
        self.dataBuffer = ""

    def _getlock(self):
        self.lock.acquire()
        self.nesting += 1

    def _droplock(self):
        nesting = self.nesting
        self.nesting = 0
        for i in range(nesting):
            self.lock.release()

    def __getattr__(self, name):
        if name == 'softspace':
            return tls.softspace
        else:
            raise AttributeError(name)

    def __setattr__(self, name, value):
        if name == 'softspace':
            tls.softspace = value
        else:
            return object.__setattr__(self, name, value)

    def write(self, data):
        self._getlock()
        self.dataBuffer += data
        if data == '\n':
            self.f.write(self.dataBuffer)
            self.f.flush()
            self.dataBuffer = ""
            self._droplock()

    def flush(self):
        self.f.flush()

需要注意的是,要让这个问题表现得异常,通常需要花费很多时间,或者在多处理器或多核心的机器上运行。我在一台单处理器的机器上测试了这个有问题的程序大约7000次,才发现了故障。为了展示我在测试中遇到的问题,我创建的这个程序在单处理器的机器上似乎也能正常工作,但在多核或多处理器的机器上运行时肯定会出问题。

下面的程序展示了这个问题,虽然比我想的要复杂一些,但我希望尽量保留我程序的行为。

进程1的代码 main.py


import subprocess, sys, socket, time, random

from threadSafeFile import ThreadSafeFile
sys.stdout = ThreadSafeFile(sys.__stdout__)

usage = "python main.py nprocs niters"

workerFilename = "/path/to/worker.py"

def startMaster(n, iters):
    host = socket.gethostname()
    for i in xrange(n):
        #set up ~synchronization between master and worker
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.bind((host,0))
        sock.listen(1)
        socketPort = sock.getsockname()[1]

        cmd = 'ssh %s python  %s %s %d %d %d' % \
            (host, workerFilename, host, socketPort, i, iters)

        proc = subprocess.Popen(cmd.split(), shell=False, stdout=None, stderr=None)

        conn, addr = sock.accept()

        #wait for worker process to start
        conn.recv(1024)

        for j in xrange(iters):
            #do very bursty i/o
            for k in xrange(iters):
                print "master: %d iter: %d message: %d" % (n,i, j)

            #sleep for some amount of time between .02s and .5s
            time.sleep(1 * (random.randint(1,50) / float(100)))

        #wait for worker to finish
        conn.recv(1024)
        sock.close()
        proc.kill()

def main(nprocs, niters):
    startMaster(nprocs, niters)



if __name__ == "__main__":
    if len(sys.argv) != 3:
        print usage
        sys.exit(1)

    nprocs = int(sys.argv[1])
    niters = int(sys.argv[2])
    main(nprocs, niters)

进程2的代码 worker.py


import sys, socket,time, random, time
from threadSafeFile import ThreadSafeFile

usage = "python host port id iters"

sys.stdout = ThreadSafeFile(sys.__stdout__)

def main(host, port, n, iters):
    #tell master to start
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.connect((host, port))
    sock.send("begin")

    for i in xrange(iters):
        #do bursty i/o
        for j in xrange(iters):
            print "worker: %d iter: %d message: %d" % (n,i, j)

        #sleep for some amount of time between .02s and .5s
        time.sleep(1 * (random.randint(1,50) / float(100)))

    #tell master we are done    
    sock.send("done")
    sock.close()


if __name__ == "__main__":
    if len(sys.argv) != 5:
        print usage
        sys.exit(1)

    host = sys.argv[1]
    port = int(sys.argv[2])
    n = int(sys.argv[3])
    iters = int(sys.argv[4])
    main(host,port,n,iters)

在测试时,我是这样运行main.py的:

python main.py 1 75 > main.out

生成的文件应该有75*75*2 = 11250行,格式为:
(master|worker): %d iter: %d message: %d

大多数情况下,文件少了20到30行,但偶尔也会看到程序输出的行数是正确的。经过进一步调查,我发现那些偶尔成功的情况中,有些行被截断了,像这样:
ter: %d message: %d

另一个有趣的方面是,当我用multiprocessing而不是subprocess启动ssh进程时,这个程序就能正常工作。有些人可能会问,既然multiprocessing可以正常工作,为什么还要用subprocess。可我这个人就是想搞清楚为什么会出现这种异常行为。任何想法或见解都非常感谢。谢谢。

***编辑 Ben,我明白threadSafeFile为每个进程使用不同的锁,但我在我的大项目中需要它有两个原因。

1) 每个进程可能有多个线程会写入stdout,尽管这个例子没有。所以我需要确保在线程级别和进程级别的安全。

2) 如果我不确保stdout在刷新时,缓冲区末尾有一个'\n',那么就可能出现执行轨迹的问题,比如进程1写入文件时没有'\n',然后进程2又写入它的缓冲区。这样就会出现行混合的情况,这不是我想要的。

我也明白这个机制让可以打印的内容有些限制。现在,在我这个项目的开发阶段,限制是可以接受的。当我能保证正确性时,我可以开始放宽这些限制。

你提到的在条件检查中锁定数据 == '\n'是不正确的。如果锁在条件检查内部,那么threadSafeFile在一般情况下就不再是线程安全的。如果任何线程都可以向数据缓冲区添加内容,那么在dataBuffer += data时就会出现竞争条件,因为这不是一个原子操作。也许你的评论只是针对这个例子,因为我们每个进程只有一个线程,但如果是这样的话,我们根本不需要锁。

关于操作系统级别的锁,我的理解是,在unix平台上,多个程序能够安全地写入同一个文件,前提是写入的字节数小于内部缓冲区的大小。在这种情况下,难道不应该由操作系统来处理所有必要的锁定吗?

1 个回答

1

在每个进程中,你为 sys.stdout 创建了一个 ThreadSafeFile,每个都有一个锁,但这些锁是不同的;不同进程中的锁之间没有任何联系。所以,你得到的效果就像根本没有使用锁一样;因为每个进程都有自己的锁,所以一个进程不会被另一个进程持有的锁阻塞。

在单处理器的机器上,这种情况之所以能工作,是因为你在写入时做了缓冲,直到遇到换行符。这意味着每一行输出都是一次性写入的。在单处理器上,操作系统可能会在一系列连续的 write 调用中切换进程,这样会搞乱你的数据。但如果输出都是以单行的块写入,并且你不在乎行在文件中的顺序,那么在你关心的操作中发生上下文切换的可能性就非常非常小。不过,这并不是理论上不可能的,所以我不会称这段代码在单处理器上是正确的。

ThreadSafeFile 仅仅是线程安全的。它依赖于程序每个写入的文件只有一个 单一ThreadSafeFile 对象。因此,对该文件的任何写入都会通过这个单一的对象进行,并在锁上进行同步。

当你有多个进程时,你没有像单个进程中的线程那样共享的全局内存。所以每个进程必然有自己独立的 ThreadSafeFile(sys.stdout) 对象。这就和你使用线程时,创建了 N 个线程,每个线程都创建了自己的 ThreadSafeFile(sys.stdout) 是同样的错误。

我不知道在使用多进程时这怎么工作,因为你没有提供你用来实现的代码。但我理解的是,如果你以每个进程创建自己的新 ThreadSafeFile 的方式使用多进程,这仍然会失败,原因都是一样的。也许在你使用多进程的版本中并不是这样?

你需要做的是让同步对象(锁)以某种方式连接起来。多进程模块可以为你做到这一点。注意在这里的例子中,锁是一次性创建的,然后在每个新进程创建时传递给它们。(当然,这仍然会在 10 个不同的进程中产生 10 个不同的锁对象,但 Python 在后台必须是创建了一个操作系统级别的锁,然后让每个复制的 Python 级别的锁对象引用这个单一的操作系统级别的锁。)

如果你想用子进程来做,也就是从不同的脚本启动完全独立的工作命令,那么你需要某种方式让它们都能与一个操作系统级别的锁进行通信。我不知道标准库中有没有什么可以帮助你做到这一点的。我建议你使用多进程。


另外,你的缓冲和锁代码看起来也有点可疑。如果有什么东西调用了 sys.stdout.write("foo\n") 会发生什么?我不确定,但我猜这之所以能工作,是因为 print 的实现恰好在你打印的内容上调用了 sys.stdout.write,然后再调用一次带有单个换行符的 sys.stdout.write。根本没有理由它必须这样做!它完全可以在内存中组装一个完整的输出字符串,然后只调用一次 sys.stdout.write。再说,如果你需要打印一块多行内容,这些内容需要一起输出,会发生什么呢?

另一个问题是,你在进程第一次写入缓冲区时获取锁,继续持有锁直到缓冲区填满,然后写入这一行,最后释放锁。如果你的锁真的有效,而一个进程在开始一行和完成它之间花了很长时间,那么它会阻塞所有其他进程,连缓冲写入都无法进行!也许这正是你想要的,如果你的意图是当一个进程开始写入时,它能保证它的输出会优先写入文件。但在这种情况下,你根本不需要缓冲。我认为你应该在 if data == '\n': 之后获取锁,这样你就不需要跟踪嵌套级别的所有代码了。

撰写回答