如何在多进程中使用共享内存概念

1 投票
1 回答
627 浏览
提问于 2025-04-19 18:16

我想同时启动两个进程,并且它们会使用一个共享的变量。其中一个进程会立即开始处理,而另一个进程则会等待第一个进程的触发信号(和共享变量),然后再开始处理。

第一个进程负责计算距离,第二个进程则根据已经走过的距离做出不同的反应。Distance 是作为参数传递的,而 current_conveyer 是共享的内存变量。

以下是我的代码:

def process1():

    current_conveyer = Value('d', 'SC1')   # also I want to know how to initialize the string values. Current it is double precision float.

    while condition:
        conveyer_type = current_conveyer.value
        S = pickle.load(open('conveyer_speed.p','rb'))[conveyer_type]
        D = S * T # speed is changing, hence calculating the speed at every instant.
        # trigger the second process. NOT create a new process
        time.sleep(0.005)

def process2(current_converyer,distance):
    while True:
        if some condition:
              current_converyer = 'SC2'
        elif some condition:
              current_converyer = 'SC3'

目前,我在每个循环中都启动一个新的进程。

我想创建一个单独的进程来处理所有这些,它会一直监听并共享这个变量。如果收到任何触发信号,这个进程应该能够响应,醒来并开始工作,而不是每次都创建一个全新的进程。

我知道可以通过队列和管道来实现这个功能,但使用这些方法就失去了共享内存的意义。

我尝试过单独使用队列和管道来实现上面的代码,但遇到了一些时间效率的问题,所以现在想尝试使用共享内存变量的方法。

所以,基于以上情况,我想知道如何让进程保持监听状态,并同时实现共享内存的概念。

1 个回答

0

最简单的多进程方法是使用一个 Pool 来执行几个较大的任务,每个任务都会把结果返回给主进程。

如果你真的想要一个“共享”的变量,可以创建一个 Manager 对象,并用它来创建可以共享的对象。这样,多个进程就可以读取和写入不同的值,而这些值会在不同的子进程之间传递。

下面的代码有一个简单的“主”代码,它启动了两个子进程,然后等待它们完成。第一个子进程醒来后,往一个共享的列表里添加一个项目,然后再等待。第二个子进程等了一会儿,消费并打印这个共享列表,然后再睡觉。

源代码

import multiprocessing, signal, time

def producer(objlist):
    '''
    add an item to list every sec
    '''
    while True:
        try:
            time.sleep(1)
        except KeyboardInterrupt:
            return
        msg = 'ding: {:04d}'.format(int(time.time()) % 10000)
        objlist.append( msg )
        print msg


def scanner(objlist):
    '''
    every now and then, consume objlist & run calculation
    '''
    while True:
        try:
            time.sleep(3)
        except KeyboardInterrupt:
            return
        print 'items: {}'.format( list(objlist) )
        objlist[:] = []


def main():

    # create obj sharable between all processes
    manager = multiprocessing.Manager()
    my_objlist = manager.list() # pylint: disable=E1101

    multiprocessing.Process(
        target=producer, args=(my_objlist,),
    ).start()

    multiprocessing.Process(
        target=scanner, args=(my_objlist,),
    ).start()

    # kill everything after a few seconds
    signal.signal(
        signal.SIGALRM, 
        lambda _sig,_frame: manager.shutdown(),
        )
    signal.alarm(12)

    try:
        manager.join() # wait until both workers die
    except KeyboardInterrupt:
        pass


if __name__=='__main__':
    main()

输出结果

ding: 8392
ding: 8393
ding: 8394
ding: 8395
ding: 8396
ding: 8397

撰写回答