使用地址在python进程之间共享ctype内存

2024-04-25 21:47:44 发布

您现在位置:Python中文网/ 问答频道 /正文

我尝试在python中跨多个进程发送动态数组。我的第一个解决方案是通过多处理类的队列/管道直接发送数据。问题是它受到以太网连接带宽的限制。因此,我尝试使用ctype数组并只传递对象的地址。当我尝试从第二个进程(A.raw或A.value)访问数组时,进程将毫无例外地退出。有人知道发生了什么事吗?可能是锁有问题等

from multiprocessing import Process,Queue
from ctypes import c_char,addressof

from time import sleep
import os




class ProcessIn(Process):
    def __init__(self,QueueI):
        super().__init__(daemon=True)
        self.QueueI=QueueI

    def run(self):
        Array=[]
        while True:
            N=100000
            A=(c_char*N)()
            A.value=b'\x01'
            Address=addressof(A)
            Array.append(A)
            print(os.getpid(),'putted',Address)
            self.QueueI.put((Address,N))
            sleep(2)





class ProcessOut(Process):
    def __init__(self,QueueI):
        super().__init__(daemon=True)
        self.QueueI=QueueI

    def run(self):
        while True:
            print(os.getpid(),'step 1')
            Address,N=self.QueueI.get()
            print(os.getpid(),'step 2',Address)
            A=(c_char*N).from_address(Address)      
            print(os.getpid(),'step 3')
            Value=A.raw         #This will fail 
            print(os.getpid(),'step 4',Value)   
            sleep(1)

if __name__ == '__main__':
    QueueI=Queue()

    In=ProcessIn(QueueI)
    Out=ProcessOut(QueueI)
    print(os.getpid(),'main')
    In.start()
    Out.start()
    input('press key to finish\n')

Tags: fromimportselftrue进程initosaddress
1条回答
网友
1楼 · 发布于 2024-04-25 21:47:44

好的,我知道了-使用带标签的mmap:

from multiprocessing import Process,Queue
from ctypes import c_char,addressof
import pyarrow as pa
import numpy as np
from time import sleep
import os
import datetime
import mmap
from sys import getsizeof



class ProcessIn(Process):
    def __init__(self,QueueI):
        super().__init__(daemon=True)
        self.QueueI=QueueI

    def run(self):  
        i=0
        while True:
            N=np.random.randint(10,14)*100000
            data = b'abcdefghijklmnopqrstuvwxyz'        
            Tag='Data_'+str(i)
            buf = mmap.mmap(-1, N*len(data),tagname=Tag)            
            buf[0]=i
            NN=N*len(data)
            # print(buf[0:10])

            print(os.getpid(),'putted',Tag,NN)
            if self.QueueI.qsize()==0:
                self.QueueI.put((Tag,NN,datetime.datetime.now()))           


            i+=1
            sleep(1)


class ProcessOut(Process):
    def __init__(self,QueueI):
        super().__init__(daemon=True)
        self.QueueI=QueueI

    def run(self):
        while True:
            # print(os.getpid(),'step 1')
            Tag,N,start=self.QueueI.get()           
            buf =  mmap.mmap(-1, N,tagname=Tag)

            print('got',buf[0:10],Tag)

            # data=buf.read()
            dt=(datetime.datetime.now()-start).total_seconds()
            if dt!=0:
                print(os.getpid(),N/dt/1024**2,'MBs',dt*1000,'ms',N/1024**2,'MB',N) 
            else:
                print(os.getpid(),np.nan,'MBs',dt*1000,'ms',N/1024**2,'MB',N)   
            buf=None

if __name__ == '__main__':
    QueueI=Queue()

    In=ProcessIn(QueueI)
    Out=ProcessOut(QueueI)
    print(os.getpid(),'main')
    In.start()
    Out.start()
    input('press key to finish\n')

相关问题 更多 >