为什么pool.map不启动新流程?

2024-05-23 17:53:28 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在尝试在单独的子流程中运行成员方法。member方法实际上是一个回调调度程序循环,它在其中执行每个回调

我面临的问题是,回调从未被调用。负责生成新进程的方法确实得到了执行,但我没有从实际的dispatcher循环或回调本身获得任何输出/信号

新流程是使用以下代码段生成的:

def run_callback_disptacher(self):
    pool = pathos.multiprocessing.Pool(1)
    pool.map(self.execute_callbacks, [])
    print('run_callback_disptacher executed')

调度程序回调是这样的:

def execute_callbacks(self):
    print(f'dispatcher list: {self.callback_list}')
    while True:
        for callback in self.callback_list:
            callback()

我正在Windows10中使用pathos.multiprocessing模块,这是一个演示此问题的最小示例:

import time
import torch 
import torchvision
from torchvision import models
from torch.utils import mkldnn as mkldnn_utils
import pathos

class SomeClass():
    def __init__(self, model_name='r18', use_jit=False, use_mkldnn=False, device='cpu'):
        self.model_name = model_name
        self.use_jit = use_jit
        self.use_mkldnn = use_mkldnn 
        self.device = device
        self.callback_list = []
        self.is_running = False
        self._init_model()

    def _init_model(self):
        if self.model_name == 'r18':
            self.model = models.resnet18(pretrained=True)
        elif self.model_name == 'r50':
            self.model = models.resnet50(pretrained=True)
        else:
            raise Exception(f"Model name: '{self.model_name}' is not recognized.")

        self.model = self.model.to(self.device)
        self.model.eval()

        if self.use_mkldnn:
            self.model = mkldnn_utils.to_mkldnn(self.model)
        if self.use_jit: 
            self.model = self.load_jit_model()

    def load_jit_model(self, jit_path = 'model.jit'):
        dummy_input = torch.tensor(torch.rand(size=(1, 3, 224, 224)))
        model = torch.jit.trace(self.model, dummy_input)
        torch.jit.save(model, jit_path)
        return torch.jit.load(jit_path)

    def add_callback(self, callback):
        self.callback_list.append(callback)

    def remove_callback(self, callback):
        self.callback_list.remove(callback)

    def get_callbacks(self):
        return self.callback_list

    def execute_callbacks(self):
        print(f'dispatcher list: {self.callback_list}')
        while True:
            for callback in self.callback_list:
                callback()

    def start(self):
        self.is_running = True
        while self.is_running:
            # simulating a generic operation here
            time.sleep(0.2)
        print('start ended!')

    def stop(self):
        self.is_running = False

    def run_callback_disptacher(self):
        pool = pathos.multiprocessing.Pool(1)
        pool.map(self.execute_callbacks, [])
        print('run_callback_disptacher executed')

这就是它的名称:

import threading
import time
from minimal_example import SomeClass

def simple_callback():
    print('hello from simple callback')

def start():
    obj = SomeClass(model_name='r18', use_jit=False, use_mkldnn=False, device='cpu')
    obj.add_callback(simple_callback)
    obj.run_callback_disptacher()
    starter = threading.Thread(target=obj.start)
    starter.start()
    time.sleep(5)
    print(obj.get_callbacks())
    obj.stop()
    print('Done!')

if __name__ == '__main__':
    start()

我错过了什么


Tags: nameimportselffalsemodelusedefcallback