在子进程中使用多进程
在Windows系统中,使用多进程之前,必须检查一下当前的进程是否是主进程,否则会出现无限循环的问题。
我试着把进程的名字改成子进程的名字,以便在一个类或函数中使用多进程,但没有成功。这真的有可能吗?到目前为止,我只能在主进程中使用多进程。
如果可以的话,有人能给我一个例子,说明如何在一个被上层进程调用的类或函数中使用多进程吗?谢谢。
编辑:
这里有一个例子 - 第一个例子可以运行,但所有的内容都在一个文件里:
simplemtexample3.py:import random
import multiprocessing
import math
def mp_factorizer(nums, nprocs):
#schtze den prozess
#print __name__
if __name__ == '__main__':
out_q = multiprocessing.Queue()
chunksize = int(math.ceil(len(nums) / float(nprocs)))
procs = []
for i in range(nprocs):
p = multiprocessing.Process(
target=worker,
args=(nums[chunksize * i:chunksize * (i + 1)],
out_q))
procs.append(p)
p.start()
# Collect all results into a single result dict. We know how many dicts
# with results to expect.
resultlist = []
for i in range(nprocs):
temp=out_q.get()
index =0
#print temp
for i in temp:
resultlist.append(temp[index][0][0:])
index +=1
# Wait for all worker processes to finish
for p in procs:
p.join()
resultlist2 = [x for x in resultlist if x != []]
return resultlist2
def worker(nums, out_q):
""" The worker function, invoked in a process. 'nums' is a
list of numbers to factor. The results are placed in
a dictionary that's pushed to a queue.
"""
outlist = []
for n in nums:
newnumber= n*2
newnumberasstring = str(newnumber)
if newnumber:
outlist.append(newnumberasstring)
out_q.put(outlist)
l = []
for i in range(80):
l.append(random.randint(1,8))
print mp_factorizer(l, 4)
但是,当我尝试从另一个文件调用mp_factorizer时,它就不工作了,因为有个if __name__ == '__main__'
的判断:
simplemtexample.py
import random
import multiprocessing
import math
def mp_factorizer(nums, nprocs):
#schtze den prozess
#print __name__
if __name__ == '__main__':
out_q = multiprocessing.Queue()
chunksize = int(math.ceil(len(nums) / float(nprocs)))
procs = []
for i in range(nprocs):
p = multiprocessing.Process(
target=worker,
args=(nums[chunksize * i:chunksize * (i + 1)],
out_q))
procs.append(p)
p.start()
# Collect all results into a single result dict. We know how many dicts
# with results to expect.
resultlist = []
for i in range(nprocs):
temp=out_q.get()
index =0
#print temp
for i in temp:
resultlist.append(temp[index][0][0:])
index +=1
# Wait for all worker processes to finish
for p in procs:
p.join()
resultlist2 = [x for x in resultlist if x != []]
return resultlist2
def worker(nums, out_q):
""" The worker function, invoked in a process. 'nums' is a
list of numbers to factor. The results are placed in
a dictionary that's pushed to a queue.
"""
outlist = []
for n in nums:
newnumber= n*2
newnumberasstring = str(newnumber)
if newnumber:
outlist.append(newnumberasstring)
out_q.put(outlist)
startsimplemtexample.py
import simplemtexample as smt
import random
l = []
for i in range(80):
l.append(random.randint(1,8))
print smt.mp_factorizer(l, 4)
2 个回答
在Windows系统上,没有os.fork
这个功能。 所以在Windows上,使用多进程模块时,它会启动一个新的Python解释器,并重新导入调用multiprocessing.Process
的脚本。
使用if __name__ == '__main__'
的目的是为了防止在脚本被重新导入时,multiprocessing.Process
被再次调用。(如果不加这个保护,就会出现“分叉炸弹”的问题。)
如果你是在一个类或函数里面调用multiprocessing.Process
,而这个类或函数在脚本重新导入时不会被调用,那么就没问题。你可以像往常一样使用multiprocessing.Process
。
if __name__ == '__main__'
在Windows系统中是必须的,特别是当你想使用多进程时。
在Windows上,它的工作方式是这样的:每当你想生成一个工作线程时,Windows会自动重新启动主进程和所有需要的文件。不过,只有第一个启动的进程被称为主进程。这就是为什么用if __name__ == '__main__'
来阻止mt_factorizer的执行,可以防止多进程创建无限循环。
简单来说,Windows需要读取包含工作线程的文件,以及工作线程调用的所有函数——每个工作线程都需要这样做。通过阻止mt_factorizer的执行,我们确保不会创建额外的工作线程,同时Windows仍然可以执行这些工作线程。这就是为什么在一个文件中包含所有代码的多进程示例会直接阻止工作线程的创建(就像mt_factorizer在这个例子中所做的那样),但不会阻止工作函数的执行。如果所有代码都在一个文件中,并且整个文件都被保护,那么就无法创建任何工作线程。
如果多进程代码位于另一个类中并被调用,那么if __name__ == '__main__'
需要直接放在调用的上方:
import random
import mptest as smt
l = []
for i in range(4):
l.append(random.randint(1,8))
print "Random numbers generated"
if __name__ == '__main__':
print smt.mp_factorizer(l, 4)
mptest.py
import multiprocessing
import math
print "Reading mptest.py file"
def mp_factorizer(nums, nprocs):
out_q = multiprocessing.Queue()
chunksize = int(math.ceil(len(nums) / float(nprocs)))
procs = []
for i in range(nprocs):
p = multiprocessing.Process(
target=worker,
args=(nums[chunksize * i:chunksize * (i + 1)],
out_q))
procs.append(p)
p.start()
# Collect all results into a single result dict. We know how many dicts
# with results to expect.
resultlist = []
for i in range(nprocs):
temp=out_q.get()
index =0
#print temp
for i in temp:
resultlist.append(temp[index][0][0:])
index +=1
# Wait for all worker processes to finish
for p in procs:
p.join()
resultlist2 = [x for x in resultlist if x != []]
return resultlist2
def worker(nums, out_q):
""" The worker function, invoked in a process. 'nums' is a
list of numbers to factor. The results are placed in
a dictionary that's pushed to a queue.
"""
outlist = []
for n in nums:
newnumber= n*2
newnumberasstring = str(newnumber)
if newnumber:
outlist.append(newnumberasstring)
out_q.put(outlist)
在上面的代码中,if __name__ == '__main__'
被移除了,因为它已经在调用文件中了。
然而,结果有点出乎意料:
Reading mptest.py file
random numbers generated
Reading mptest.py file
random numbers generated
worker started
Reading mptest.py file
random numbers generated
worker started
Reading mptest.py file
random numbers generated
worker started
Reading mptest.py file
random numbers generated
worker started
['1', '1', '4', '1']
多进程被阻止了无限执行,但其余的代码仍然会被执行多次(在这个例子中是随机数生成)。这不仅会导致性能下降,还可能引发其他麻烦的错误。解决办法是保护整个主进程,防止它被Windows重复执行,如果在某个地方使用了多进程:
import random
import mptest as smt
if __name__ == '__main__':
l = []
for i in range(4):
l.append(random.randint(1,8))
print "random numbers generated"
print smt.mp_factorizer(l, 4)
现在我们得到的只是期望的结果,随机数只生成一次:
Reading mptest.py file
random numbers generated
Reading mptest.py file
worker started
Reading mptest.py file
worker started
Reading mptest.py file
worker started
Reading mptest.py file
worker started
['1', '6', '2', '1']
请注意,在这个例子中,mpteststart.py是主进程。如果不是,if __name__ == '__main__'
必须向上移动到调用链中,直到它位于主进程中。一旦以这种方式保护了主进程,就不会再有不必要的重复代码执行了。