复杂对象的多处理远程管理器

2024-04-20 01:36:26 发布

您现在位置:Python中文网/ 问答频道 /正文

我想在N进程之间共享一个数据帧。 我尝试用多处理库来实现这个任务。 我使用带有命名空间的远程SyncManager。 如果我以几秒钟的间隔并行执行此代码两次,则第二个名称空间对象没有df_parquet属性

我想知道如何获得第一次执行的df_拼花地板实例

import time
from multiprocessing import Process, Lock
from multiprocessing.managers import SyncManager
from threading import Thread
import pandas as pd

class ParquetDataframe(object):
    def __init__(self,manager,global_val):
        self.manager = manager
        self.Global = global_val
        if not hasattr(self.Global, 'df_parquet'):
            print("Global.df_parquet - Init")
            lock = self.manager.Lock()
            lock.acquire()
            data = [['tom', 10], ['nick', 15], ['juli', 14]]
            df = pd.DataFrame(data, columns=['Name', 'Age'])
            self.Global.df_parquet = df
            lock.release()
        print("Global.df_parquet : "+str(self.Global.df_parquet))

    def get_value(self):
        return self.Global.df_parquet

def func(parquet):
    for i in range(50):
        time.sleep(1)
        parquet.get_value()

def serve(manager):
    print("Manager creation - Begin")
    manager.get_server().serve_forever()
    print("Manager creation - End")

class Manager(SyncManager):
    pass

if __name__ == '__main__':
    print("Initialisation step - Begin")
    Global = None
    manager = Manager(address=('127.0.0.1', 50000),authkey=bytes("12345", encoding='utf8'))
    print("Initialisation step - End")
    try:
        print("Try connect to the manager")
        manager.connect()
        Global = manager.Namespace()
    except:
        print("If exception we create a manager")
        background_thread = Thread(target=serve, args=(manager,))
        background_thread.start()
        manager.connect()

    print("Get the Namespace")
    Global = manager.Namespace()
    parquet = ParquetDataframe(manager,Global)
    parquet = ParquetDataframe(manager, Global)
    #procs = [Process(target=func, args=(parquet,)) for i in range(10)]

    #for p in procs: p.start()
    #for p in procs: p.join()

Tags: infromimportselflockdffordef