Python 多进程同步

3 投票
2 回答
13482 浏览
提问于 2025-04-18 17:41

我有一个叫“function”的函数,我想用两个5核的CPU把它调用10次,想用多进程来实现。

所以我需要一种方法来同步这些进程,下面的代码就是我想要的样子。

我想知道有没有办法不使用多进程池来做到这一点?如果不这样做,我会遇到奇怪的错误,比如“UnboundLocalError: local variable 'fd' referenced before assignment”(我根本没有这个变量)。而且这些进程似乎会随机终止。

如果可以的话,我希望不使用池来实现。谢谢!

number_of_cpus = 5
number_of_iterations = 2

# An array for the processes.
processing_jobs = []

# Start 5 processes 2 times.
for iteration in range(0, number_of_iterations):

    # TODO SYNCHRONIZE HERE

    # Start 5 processes at a time.
    for cpu_number in range(0, number_of_cpus):

        # Calculate an offset for the current function call.
        file_offset = iteration * cpu_number * number_of_files_per_process

        p = multiprocessing.Process(target=function, args=(file_offset,))
        processing_jobs.append(p)
        p.start()

    # TODO SYNCHRONIZE HERE

这是我在使用池运行代码时遇到的错误的(匿名化)追踪信息:

Process Process-5:
Traceback (most recent call last):
  File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
    self._target(*self._args, **self._kwargs)
  File "python_code_3.py", line 88, in function_x
    xyz = python_code_1.function_y(args)
  File "/python_code_1.py", line 254, in __init__
    self.WK =  file.WK(filename)
  File "/python_code_2.py", line 1754, in __init__
    self.__parse__(name, data, fast_load)
  File "/python_code_2.py", line 1810, in __parse__
    fd.close()
UnboundLocalError: local variable 'fd' referenced before assignment

大部分进程都是这样崩溃的,但不是全部。当我增加进程数量时,崩溃的情况似乎更多。我还想这可能是因为内存限制的问题……

2 个回答

1

一个Pool(池)使用起来非常简单。下面是一个完整的例子:

源代码

import multiprocessing

def calc(num):
    return num*2

if __name__=='__main__':  # required for Windows
    pool = multiprocessing.Pool()   # one Process per CPU
    for output in pool.map(calc, [1,2,3]):
        print 'output:',output

输出结果

output: 2
output: 4
output: 6
1

这里有一种方法,可以让你实现想要的同步,而不需要使用池:

import multiprocessing

def function(arg):
    print ("got arg %s" % arg)

if __name__ == "__main__":
    number_of_cpus = 5
    number_of_iterations = 2

    # An array for the processes.
    processing_jobs = []

    # Start 5 processes 2 times.
    for iteration in range(1, number_of_iterations+1):  # Start the range from 1 so we don't multiply by zero.

        # Start 5 processes at a time.
        for cpu_number in range(1, number_of_cpus+1):

            # Calculate an offset for the current function call.
            file_offset = iteration * cpu_number * number_of_files_per_process

            p = multiprocessing.Process(target=function, args=(file_offset,))
            processing_jobs.append(p)
            p.start()

        # Wait for all processes to finish.
        for proc in processing_jobs:
            proc.join()

        # Empty active job list.
        del processing_jobs[:]

        # Write file here
        print("Writing")

这是使用了一个 Pool 的版本:

import multiprocessing

def function(arg):
    print ("got arg %s" % arg)

if __name__ == "__main__":
    number_of_cpus = 5
    number_of_iterations = 2

    pool = multiprocessing.Pool(number_of_cpus)
    for i in range(1, number_of_iterations+1): # Start the range from 1 so we don't multiply by zero
        file_offsets = [number_of_files_per_process * i * cpu_num for cpu_num in range(1, number_of_cpus+1)] 
        pool.map(function, file_offsets)
        print("Writing")
        # Write file here

正如你所看到的,使用 Pool 的方案看起来更好。

不过,这并没有解决你的追踪问题。要解决这个问题,我很难说该怎么做,因为我不太清楚到底是什么导致了这个问题。你可能需要使用 multiprocessing.Lock 来同步对资源的访问。

撰写回答