Python在多个线程中运行多个锁

0 投票
1 回答
6058 浏览
提问于 2025-04-18 12:26

情况是这样的,我有多个方法,这些方法可能会同时运行,但每个方法都需要一个自己的锁,以防止在它们运行期间被其他线程重新调用。这个锁是通过初始化一个类,并设置一些数据处理选项来建立的:

class InfrequentDataDaemon(object): pass
class FrequentDataDaemon(object): pass

def addMethod(name): 
    def wrapper(f):
        setattr(processor, f.__name__, staticmethod(f))
        return f
    return wrapper
    
class DataProcessors(object): 
    lock = threading.Lock() 
    def __init__(self, options): 
        self.common_settings = options['common_settings']
        
        self.data_processing_configurations = options['data_processing_configurations'] #Configs for each processing method
        self.data_processing_types = options['data_processing_types'] 
        self.Data_Processsing_Functions ={}
        
        #I __init__ each processing method as a seperate function so that it can be locked
        for type in options['data_processing_types']: 
            def bindFunction1(name):
                def func1(self, data=None, lock=None):
                    config = self.data_processing_configurations[data['type']] #I get the right config for the datatype
                    with lock:
                        FetchDataBaseStuff(data['type'])
                         #I don't want this to be run more than once at a time per DataProcessing Type
                         # But it's fine if multiple DoSomethings run at once, as long as each DataType is different!
                        DoSomething(data, config) 
                        WriteToDataBase(data['type'])
                func1.__name__ = "Processing_for_{}".format(type)
                self.Data_Processing_Functions[func1.__name__] = func1 #Add this function to the Dictinary object
           bindFunction1(type)

        #Then I add some methods to a daemon that are going to check if our Dataprocessors need to be called
        def fast_process_types(data): 
            if not example_condition is True: return
            if not data['type'] in self.data_processing_types: return #Check that we are doing something with this type of data
            threading.Thread(target=self.Data_Processing_Functions["Processing_for_{}".format(data['type'])], args=(self,data, lock)).start()
        
        def slow_process_types(data): 
            if not some_other_condition is True: return
            if not data['type'] in self.data_processing_types: return #Check that we are doing something with this type of data
            threading.Thread(target=self.Data_Processing_Functions["Processing_for_{}".format(data['type'])], args=(self,data, lock)).start()
        
        addMethod(InfrequentDataDaemon)(slow_process_types)
        addMethod(FrequentDataDaemon)(fast_process_types)

我的想法是给每个方法加锁,确保在DataProcessors.Data_Processing_Functions中,每个方法一次只能被一个线程访问(其他线程需要排队等候)。那么,应该如何设置这个锁才能实现这个效果呢?

1 个回答

1

我不太确定你想要做什么,不过你能不能为每种类型创建一个单独的 threading.Lock 对象呢?

class DataProcessors(object): 
    def __init__(self, options): 
        self.common_settings = options['common_settings']

        self.data_processing_configurations = options['data_processing_configurations'] #Configs for each processing method
        self.data_processing_types = options['data_processing_types'] 
        self.Data_Processsing_Functions ={}
        self.locks = {}

        #I __init__ each processing method as a seperate function so that it can be locked
        for type in options['data_processing_types']: 
            self.locks[type] = threading.Lock()
            def bindFunction1(name):
                def func1(self, data=None):
                    config = self.data_processing_configurations[data['type']] #I get the right config for the datatype
                    with self.locks[data['type']]:
                        FetchDataBaseStuff(data['type'])
                        DoSomething(data, config) 
                        WriteToDataBase(data['type'])
                func1.__name__ = "Processing_for_{}".format(type)
                self.Data_Processing_Functions[func1.__name__] = func1 #Add this function to the Dictinary object
           bindFunction1(type)

        #Then I add some methods to a daemon that are going to check if our Dataprocessors need to be called
        def fast_process_types(data): 
            if not example_condition is True: return
            if not data['type'] in self.data_processing_types: return #Check that we are doing something with this type of data
            threading.Thread(target=self.Data_Processing_Functions["Processing_for_{}".format(data['type'])], args=(self,data)).start()

        def slow_process_types(data): 
            if not some_other_condition is True: return
            if not data['type'] in self.data_processing_types: return #Check that we are doing something with this type of data
            threading.Thread(target=self.Data_Processing_Functions["Processing_for_{}".format(data['type'])], args=(self,data)).start()

        addMethod(InfrequentDataDaemon)(slow_process_types)
        addMethod(FrequentDataDaemon)(fast_process_types)

撰写回答