如何在Python中进行原子写入到stdout?

9 投票
2 回答
5320 浏览
提问于 2025-04-18 14:56

我在一些资料中看到,print命令不是线程安全的,解决办法是用sys.stdout.write命令来代替,但对我来说,这个方法还是不管用,写入STDOUT的过程也不是原子的。

这里有一个简单的例子(这个文件叫做parallelExperiment.py):

   import os
   import sys
   from multiprocessing import Pool

   def output(msg):
    msg = '%s%s' % (msg, os.linesep)
    sys.stdout.write(msg)

   def func(input):
    output(u'pid:%d got input \"%s\"' % (os.getpid(), str(input)))

   def executeFunctionInParallel(funcName, inputsList, maxParallelism):
       output(u'Executing function %s on input of size %d with maximum parallelism of %d' % (
           funcName.__name__, len(inputsList), maxParallelism))
       parallelismPool = Pool(processes=maxParallelism)
       executeBooleanResultsList = parallelismPool.map(funcName, inputsList)
       parallelismPool.close()
       output(u'Function %s executed on input of size %d  with maximum parallelism of %d' % (
           funcName.__name__, len(inputsList), maxParallelism))
       # if all parallel executions executed well - the boolean results list should all be True
       return all(executeBooleanResultsList)

   if __name__ == "__main__":
    inputsList=[str(i) for i in range(20)]
    executeFunctionInParallel(func, inputsList, 4)

看看输出结果:

i. 调用python parallelExperiment.py的输出(注意有些行中的“pid”这个词显示得乱七八糟):

Executing function func on input of size 20 with maximum parallelism of 4
ppid:2240 got input "0"
id:4960 got input "2"
pid:4716 got input "4"
pid:4324 got input "6"
ppid:2240 got input "1"
id:4960 got input "3"
pid:4716 got input "5"
pid:4324 got input "7"
ppid:4960 got input "8"
id:2240 got input "10"
pid:4716 got input "12"
pid:4324 got input "14"
ppid:4960 got input "9"
id:2240 got input "11"
pid:4716 got input "13"
pid:4324 got input "15"
ppid:4960 got input "16"
id:2240 got input "18"
ppid:2240 got input "19"
id:4960 got input "17"
Function func executed on input of size 20  with maximum parallelism of 4

ii. 调用python parallelExperiment.py > parallelExperiment.log的输出,这意味着把stdout重定向到parallelExperiment.log文件中(注意行的顺序不好,因为在调用executeFunctionInParallel之前和之后,应该打印一条消息):

pid:3244 got input "4"
pid:3244 got input "5"
pid:3244 got input "12"
pid:3244 got input "13"
pid:240 got input "0"
pid:240 got input "1"
pid:240 got input "8"
pid:240 got input "9"
pid:240 got input "16"
pid:240 got input "17"
pid:1268 got input "2"
pid:1268 got input "3"
pid:1268 got input "10"
pid:1268 got input "11"
pid:1268 got input "18"
pid:1268 got input "19"
pid:3332 got input "6"
pid:3332 got input "7"
pid:3332 got input "14"
pid:3332 got input "15"
Executing function func on input of size 20 with maximum parallelism of 4
Function func executed on input of size 20  with maximum parallelism of 4

2 个回答

2

如果你想避免文件锁定,并且愿意使用更底层的接口,你可以通过 os.openos.write 来实现 POSIX 的 O_APPEND 行为(前提是你的系统支持这些功能);你可以查看这个链接了解更多信息:在 UNIX 中,文件追加操作是原子性的吗?

8

这个问题发生是因为 multiprocessing.Pool 实际上使用的是子进程,而不是线程。你需要在进程之间进行明确的同步。注意,链接中的例子可以解决你的问题。

import os
import sys
from multiprocessing import Pool, Lock

lock = Lock()

def output(msg):
    msg = '%s%s' % (msg, os.linesep)
    with lock:
        sys.stdout.write(msg)

def func(input):
    output(u'pid:%d got input \"%s\"' % (os.getpid(), str(input)))

def executeFunctionInParallel(funcName, inputsList, maxParallelism):
    output(u'Executing function %s on input of size %d with maximum parallelism of %d' % (
      funcName.__name__, len(inputsList), maxParallelism))
    parallelismPool = Pool(processes=maxParallelism)
    executeBooleanResultsList = parallelismPool.map(funcName, inputsList)
    parallelismPool.close()
    parallelismPool.join()
    output(u'Function %s executed on input of size %d  with maximum parallelism of %d' % (
       funcName.__name__, len(inputsList), maxParallelism))
    # if all parallel executions executed well - the boolean results list should all be True
    return all(executeBooleanResultsList)

if __name__ == "__main__":
    inputsList=[str(i) for i in range(20)]
    executeFunctionInParallel(func, inputsList, 4)

撰写回答