Python 2.x 中多进程线程安全打印
我遇到了一个很奇怪的问题,实在无法解释,就是在多个进程(用subprocess模块启动)向文件打印时出现的。我的输出有些地方稍微被截断了,还有一些内容完全消失了。我使用了Alex Martelli的一个稍微修改过的解决方案,目的是确保多个进程写同一个文件时,输出不会混在一起。我在写的时候会先把内容缓存起来,只有在看到换行符时才真正写入文件。
import sys
import threading
tls = threading.local()
class ThreadSafeFile(object):
"""
@author: Alex Martelli
@see: https://stackoverflow.com/questions/3029816/how-do-i-get-a-thread-safe-print-in-python-2-6
@summary: Allows for safe printing of output of multi-threaded programs to stdout.
"""
def __init__(self, f):
self.f = f
self.lock = threading.RLock()
self.nesting = 0
self.dataBuffer = ""
def _getlock(self):
self.lock.acquire()
self.nesting += 1
def _droplock(self):
nesting = self.nesting
self.nesting = 0
for i in range(nesting):
self.lock.release()
def __getattr__(self, name):
if name == 'softspace':
return tls.softspace
else:
raise AttributeError(name)
def __setattr__(self, name, value):
if name == 'softspace':
tls.softspace = value
else:
return object.__setattr__(self, name, value)
def write(self, data):
self._getlock()
self.dataBuffer += data
if data == '\n':
self.f.write(self.dataBuffer)
self.f.flush()
self.dataBuffer = ""
self._droplock()
def flush(self):
self.f.flush()
需要注意的是,要让这个问题表现得异常,通常需要花费很多时间,或者在多处理器或多核心的机器上运行。我在一台单处理器的机器上测试了这个有问题的程序大约7000次,才发现了故障。为了展示我在测试中遇到的问题,我创建的这个程序在单处理器的机器上似乎也能正常工作,但在多核或多处理器的机器上运行时肯定会出问题。
下面的程序展示了这个问题,虽然比我想的要复杂一些,但我希望尽量保留我程序的行为。
进程1的代码 main.py
import subprocess, sys, socket, time, random
from threadSafeFile import ThreadSafeFile
sys.stdout = ThreadSafeFile(sys.__stdout__)
usage = "python main.py nprocs niters"
workerFilename = "/path/to/worker.py"
def startMaster(n, iters):
host = socket.gethostname()
for i in xrange(n):
#set up ~synchronization between master and worker
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.bind((host,0))
sock.listen(1)
socketPort = sock.getsockname()[1]
cmd = 'ssh %s python %s %s %d %d %d' % \
(host, workerFilename, host, socketPort, i, iters)
proc = subprocess.Popen(cmd.split(), shell=False, stdout=None, stderr=None)
conn, addr = sock.accept()
#wait for worker process to start
conn.recv(1024)
for j in xrange(iters):
#do very bursty i/o
for k in xrange(iters):
print "master: %d iter: %d message: %d" % (n,i, j)
#sleep for some amount of time between .02s and .5s
time.sleep(1 * (random.randint(1,50) / float(100)))
#wait for worker to finish
conn.recv(1024)
sock.close()
proc.kill()
def main(nprocs, niters):
startMaster(nprocs, niters)
if __name__ == "__main__":
if len(sys.argv) != 3:
print usage
sys.exit(1)
nprocs = int(sys.argv[1])
niters = int(sys.argv[2])
main(nprocs, niters)
进程2的代码 worker.py
import sys, socket,time, random, time
from threadSafeFile import ThreadSafeFile
usage = "python host port id iters"
sys.stdout = ThreadSafeFile(sys.__stdout__)
def main(host, port, n, iters):
#tell master to start
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((host, port))
sock.send("begin")
for i in xrange(iters):
#do bursty i/o
for j in xrange(iters):
print "worker: %d iter: %d message: %d" % (n,i, j)
#sleep for some amount of time between .02s and .5s
time.sleep(1 * (random.randint(1,50) / float(100)))
#tell master we are done
sock.send("done")
sock.close()
if __name__ == "__main__":
if len(sys.argv) != 5:
print usage
sys.exit(1)
host = sys.argv[1]
port = int(sys.argv[2])
n = int(sys.argv[3])
iters = int(sys.argv[4])
main(host,port,n,iters)
在测试时,我是这样运行main.py的:
python main.py 1 75 > main.out
生成的文件应该有75*75*2 = 11250行,格式为:
(master|worker): %d iter: %d message: %d
大多数情况下,文件少了20到30行,但偶尔也会看到程序输出的行数是正确的。经过进一步调查,我发现那些偶尔成功的情况中,有些行被截断了,像这样:
ter: %d message: %d
另一个有趣的方面是,当我用multiprocessing而不是subprocess启动ssh进程时,这个程序就能正常工作。有些人可能会问,既然multiprocessing可以正常工作,为什么还要用subprocess。可我这个人就是想搞清楚为什么会出现这种异常行为。任何想法或见解都非常感谢。谢谢。
***编辑 Ben,我明白threadSafeFile为每个进程使用不同的锁,但我在我的大项目中需要它有两个原因。
1) 每个进程可能有多个线程会写入stdout,尽管这个例子没有。所以我需要确保在线程级别和进程级别的安全。
2) 如果我不确保stdout在刷新时,缓冲区末尾有一个'\n',那么就可能出现执行轨迹的问题,比如进程1写入文件时没有'\n',然后进程2又写入它的缓冲区。这样就会出现行混合的情况,这不是我想要的。
我也明白这个机制让可以打印的内容有些限制。现在,在我这个项目的开发阶段,限制是可以接受的。当我能保证正确性时,我可以开始放宽这些限制。
你提到的在条件检查中锁定数据 == '\n'是不正确的。如果锁在条件检查内部,那么threadSafeFile在一般情况下就不再是线程安全的。如果任何线程都可以向数据缓冲区添加内容,那么在dataBuffer += data时就会出现竞争条件,因为这不是一个原子操作。也许你的评论只是针对这个例子,因为我们每个进程只有一个线程,但如果是这样的话,我们根本不需要锁。
关于操作系统级别的锁,我的理解是,在unix平台上,多个程序能够安全地写入同一个文件,前提是写入的字节数小于内部缓冲区的大小。在这种情况下,难道不应该由操作系统来处理所有必要的锁定吗?
1 个回答
在每个进程中,你为 sys.stdout
创建了一个 ThreadSafeFile
,每个都有一个锁,但这些锁是不同的;不同进程中的锁之间没有任何联系。所以,你得到的效果就像根本没有使用锁一样;因为每个进程都有自己的锁,所以一个进程不会被另一个进程持有的锁阻塞。
在单处理器的机器上,这种情况之所以能工作,是因为你在写入时做了缓冲,直到遇到换行符。这意味着每一行输出都是一次性写入的。在单处理器上,操作系统可能会在一系列连续的 write
调用中切换进程,这样会搞乱你的数据。但如果输出都是以单行的块写入,并且你不在乎行在文件中的顺序,那么在你关心的操作中发生上下文切换的可能性就非常非常小。不过,这并不是理论上不可能的,所以我不会称这段代码在单处理器上是正确的。
ThreadSafeFile
仅仅是线程安全的。它依赖于程序每个写入的文件只有一个 单一 的 ThreadSafeFile
对象。因此,对该文件的任何写入都会通过这个单一的对象进行,并在锁上进行同步。
当你有多个进程时,你没有像单个进程中的线程那样共享的全局内存。所以每个进程必然有自己独立的 ThreadSafeFile(sys.stdout)
对象。这就和你使用线程时,创建了 N 个线程,每个线程都创建了自己的 ThreadSafeFile(sys.stdout)
是同样的错误。
我不知道在使用多进程时这怎么工作,因为你没有提供你用来实现的代码。但我理解的是,如果你以每个进程创建自己的新 ThreadSafeFile
的方式使用多进程,这仍然会失败,原因都是一样的。也许在你使用多进程的版本中并不是这样?
你需要做的是让同步对象(锁)以某种方式连接起来。多进程模块可以为你做到这一点。注意在这里的例子中,锁是一次性创建的,然后在每个新进程创建时传递给它们。(当然,这仍然会在 10 个不同的进程中产生 10 个不同的锁对象,但 Python 在后台必须是创建了一个操作系统级别的锁,然后让每个复制的 Python 级别的锁对象引用这个单一的操作系统级别的锁。)
如果你想用子进程来做,也就是从不同的脚本启动完全独立的工作命令,那么你需要某种方式让它们都能与一个操作系统级别的锁进行通信。我不知道标准库中有没有什么可以帮助你做到这一点的。我建议你使用多进程。
另外,你的缓冲和锁代码看起来也有点可疑。如果有什么东西调用了 sys.stdout.write("foo\n")
会发生什么?我不确定,但我猜这之所以能工作,是因为 print
的实现恰好在你打印的内容上调用了 sys.stdout.write
,然后再调用一次带有单个换行符的 sys.stdout.write
。根本没有理由它必须这样做!它完全可以在内存中组装一个完整的输出字符串,然后只调用一次 sys.stdout.write
。再说,如果你需要打印一块多行内容,这些内容需要一起输出,会发生什么呢?
另一个问题是,你在进程第一次写入缓冲区时获取锁,继续持有锁直到缓冲区填满,然后写入这一行,最后释放锁。如果你的锁真的有效,而一个进程在开始一行和完成它之间花了很长时间,那么它会阻塞所有其他进程,连缓冲写入都无法进行!也许这正是你想要的,如果你的意图是当一个进程开始写入时,它能保证它的输出会优先写入文件。但在这种情况下,你根本不需要缓冲。我认为你应该在 if data == '\n':
之后获取锁,这样你就不需要跟踪嵌套级别的所有代码了。