跨进程共享多进程同步原语

2024-05-29 04:42:32 发布

您现在位置:Python中文网/ 问答频道 /正文

(python3.4,Linux)。在

我有一个主进程“p”,它分叉8个进程(“C1”到“C8”)。我想创建multiprocessing.Barrier,以确保所有8个子进程在某一时刻同步。在

如果我在父进程中定义同步原语,那么一切都会正常工作,这样当我分叉子进程时,它会被正确继承:

import multiprocessing as mp
barrier = mp.Barrier(8)

def f():
  # do something
  barrier.wait()
  # do more stuff

def main():
  for i in range(8):
    p = mp.Process(target = f)
    p.start()

if __name__ == '__main__':
  main()

但是在我的例子中,直到子进程启动之后,我才知道创建Barrier对象所需的详细信息(我不知道要作为其action参数传递的参数)。因此,我想在其中一个子进程中创建Barrier,但是我不知道如何使它对其他子进程可用。由于子进程中的8Barrier对象彼此完全独立,因此下面的操作将不起作用:

^{pr2}$

我在考虑在其中一个子进程中创建barrier,并使用multiprocessing.Queue(或者如果{}不接受{}对象,则使用multiprocessing.Manager().Barrier)将其传递给其他进程。然而,即使这样做,我也不知道如何确保只有一个进程实际上put将同步原语的(7个副本)放到队列中,而其他进程只会get它们。(当然,我可以在父进程中创建另一个同步原语来完成这一任务,但是我还是可以重构代码,在父进程中创建原始的Barrier。)


Tags: 对象进程mainlinuxdefmpmultiprocessingdo
3条回答

下面是一个例子,通过在一个子级中创建一个multiprocessing.managers.BaseManager,然后从所有其他子级连接到该管理器。注意,它需要从父对象向所有子对象传递一个multiprocessing.Lock,这是您提到的最好避免的。不过,我不确定还有其他选择。在

import multiprocessing as mp
from multiprocessing.managers import BaseManager

class MyManager(BaseManager):
    pass

def f(lock):
  # do something
  with lock:
      try:
          MyManager.register('get_barrier')
          m = MyManager(address=('localhost', 5555), authkey=b'akey')
          m.connect()
          b = m.get_barrier()
          print("Got the barrier from the manager")
      except OSError as e:
          # We are the first. Create the manager, register
          # a mp.Barrier instance with it, and start it up.
          print("Creating the manager...")
          b = mp.Barrier(8)
          MyManager.register('get_barrier', callable=lambda:b)
          m = MyManager(address=('localhost', 5555), authkey=b'akey')
          m.start()
  b.wait()
  print("Done!")
  # do more stuff

def main():
    lock = mp.Lock()
    for i in range(8):
        p = mp.Process(target=f, args=(lock,))
        p.start()

if __name__ == '__main__':
  main()

输出:

^{pr2}$

下面是一个同样适用于Windows的版本(缺少的fork会引起额外的麻烦):

import multiprocessing as mp

def procs(uid_barrier):
    uid, barrier = uid_barrier
    print(uid, 'waiting')
    barrier.wait()
    print(uid, 'past barrier')    

def main():
    N_PROCS = 10
    with mp.Manager() as man:
        barrier = man.Barrier(N_PROCS)
        with mp.Pool(N_PROCS) as p:
            p.map(procs, ((uid, barrier) for uid in range(N_PROCS)))

if __name__ == '__main__':
    mp.freeze_support()
    main()

是否可以简单地捕获进程的id,然后只在其中一个进程中调用您的操作?像这样?在

import multiprocessing as mp
barrier = mp.Barrier(8)

def f():
  # create action
  def action():
      print("action was run on process {}.".format(id))

  # do something
  print("Hello from process {}.".format(id))
  id = barrier.wait()
  if id == 0:
      action()
  barrier.wait()

  # Do more stuff

def main():
  for i in range(8):
    p = mp.Process(target = f)
    p.start()

if __name__ == '__main__':
  main()

相关问题 更多 >

    热门问题