在多处理中共享公共变量

2024-06-01 02:38:47 发布

您现在位置:Python中文网/ 问答频道 /正文

我有代码转换一个自定义文件格式到其他自定义文件格式。你知道吗

用于将工作划分为并行工作。一些除以进程计数逻辑将文件列表作为输入参数传递。你知道吗

代码:

def CustomFun1(self, ):
    start_time = time.time()
    custom1_files = os.listdir("/Some/location")
    self.total_custom1_count = custom1_files.__len__()
    self.custom2_ctr = 0
    self.updateReport(2, 1, self.total_custom_count, -1, "")
    self.custom1_2_custom2_cmd = "command to convert custom1 to custom2"

    process_count = 4
    if process_count>1 and self.total_custom1_count>process_count:
        import multiprocessing
        mid = self.total_custom1_count/process_count
        processes = []
        start_index = 0
        end_index = 0 
        for ii in range(1, process_count):
            end_index += mid
            pp = multiprocessing.Process(name='custom12custom2_%s_%s'%(ii, time.time()),\
                                       target=self.createCustom1ToCustom2,\
                                       args=(custom1_files[start_index:end_index], ))
            pp.daemon = False
            processes.append(pp)
            start_index = end_index

        pp = multiprocessing.Process(name='custom12custom2_%s_%s'%(ii, time.time()),\
                                     target=self.createCustom1ToCustom2,\
                                     args=(custom1_files[start_index:], ))
        pp.daemon = False
        processes.append(pp)
        for pp in processes:
            pp.start()
        for pp in processes:
            pp.join()
    else:
        self.createCustom1ToCustom2(custom1_files)
    t2 = time.time() - start_time
    self.updateReport(2, 2, self.total_custom1_count, self.custom2_ctr, t2)

def createCustom1ToCustom2(self, custom1_files):
    """ Create Custom2 from the Custom1. """
    try:
        for cnt, custom1_file in enumerate(custom1_files, 1):
            ret = os.system(self.custom1_2_custom2_cmd%(custom1_file, custom1_file.split('.')[0]))
            self.custom2_ctr += 1 
            if self.custom2_ctr%5==0:
                self.updateReport(2, 1, self.total_custom1_count, self.custom2_ctr, "")
    except:
        e = traceback.format_exc()

下面是一个函数,在该函数中,我编写了将多少Custom1类型的文件转换为Custom2类型的文件。你知道吗

报告变量:

self.report = [{"pn": "Extraction", "status": 0, "cnt": 0, "tt": 0},
               {"pn": "Basic conversion Generation", "status": 0, "cnt": 0, "cur_i": 0, "tt": 0},
               {"pn": "Cutom1 to custom2", "status": 0, "cnt": 0, "cur_i": 0, "tt": 0}
               ]
def updateReport(self, pos, status, cnt, cur_i, tt):
    if not self.reportLoc:
        return

    try:
        self.report[pos]["status"] = status
        self.report[pos]["cnt"] = cnt
        if tt:
            self.report[pos]["tt"] = datetime.fromtimestamp(tt).strftime('%H:%M:%S')

        self.report[pos]["cur_i"] = cur_i
        with open(self.reportLoc, "w") as fp:
            fp.write(simplejson.dumps(self.report))
    except Exception, e:
        e = traceback.format_exc()

多处理正在工作,并且也得到了预期的输出,这个过程需要30到40分钟来转换2000个文件,我需要显示30秒或1分钟后转换了多少个文件。为此,我需要在函数updateReport中编写的报告文件。你知道吗

  1. ^在由多处理运行的代码中,{}未更新意味着self.custom2_ctr的值在函数CustomFun1的末尾0。你知道吗
  2. 在函数updateReport中,代码剪切公共self.report 变量。如何锁定函数updateReport仅表示 一个进程可以一次剪切此函数?你知道吗

如果你需要更多的信息,请告诉我。你知道吗


Tags: 文件函数selfreportindextimecountfiles