Python在多个线程中运行多个锁
情况是这样的,我有多个方法,这些方法可能会同时运行,但每个方法都需要一个自己的锁,以防止在它们运行期间被其他线程重新调用。这个锁是通过初始化一个类,并设置一些数据处理选项来建立的:
class InfrequentDataDaemon(object): pass
class FrequentDataDaemon(object): pass
def addMethod(name):
def wrapper(f):
setattr(processor, f.__name__, staticmethod(f))
return f
return wrapper
class DataProcessors(object):
lock = threading.Lock()
def __init__(self, options):
self.common_settings = options['common_settings']
self.data_processing_configurations = options['data_processing_configurations'] #Configs for each processing method
self.data_processing_types = options['data_processing_types']
self.Data_Processsing_Functions ={}
#I __init__ each processing method as a seperate function so that it can be locked
for type in options['data_processing_types']:
def bindFunction1(name):
def func1(self, data=None, lock=None):
config = self.data_processing_configurations[data['type']] #I get the right config for the datatype
with lock:
FetchDataBaseStuff(data['type'])
#I don't want this to be run more than once at a time per DataProcessing Type
# But it's fine if multiple DoSomethings run at once, as long as each DataType is different!
DoSomething(data, config)
WriteToDataBase(data['type'])
func1.__name__ = "Processing_for_{}".format(type)
self.Data_Processing_Functions[func1.__name__] = func1 #Add this function to the Dictinary object
bindFunction1(type)
#Then I add some methods to a daemon that are going to check if our Dataprocessors need to be called
def fast_process_types(data):
if not example_condition is True: return
if not data['type'] in self.data_processing_types: return #Check that we are doing something with this type of data
threading.Thread(target=self.Data_Processing_Functions["Processing_for_{}".format(data['type'])], args=(self,data, lock)).start()
def slow_process_types(data):
if not some_other_condition is True: return
if not data['type'] in self.data_processing_types: return #Check that we are doing something with this type of data
threading.Thread(target=self.Data_Processing_Functions["Processing_for_{}".format(data['type'])], args=(self,data, lock)).start()
addMethod(InfrequentDataDaemon)(slow_process_types)
addMethod(FrequentDataDaemon)(fast_process_types)
我的想法是给每个方法加锁,确保在DataProcessors.Data_Processing_Functions
中,每个方法一次只能被一个线程访问(其他线程需要排队等候)。那么,应该如何设置这个锁才能实现这个效果呢?
1 个回答
1
我不太确定你想要做什么,不过你能不能为每种类型创建一个单独的 threading.Lock
对象呢?
class DataProcessors(object):
def __init__(self, options):
self.common_settings = options['common_settings']
self.data_processing_configurations = options['data_processing_configurations'] #Configs for each processing method
self.data_processing_types = options['data_processing_types']
self.Data_Processsing_Functions ={}
self.locks = {}
#I __init__ each processing method as a seperate function so that it can be locked
for type in options['data_processing_types']:
self.locks[type] = threading.Lock()
def bindFunction1(name):
def func1(self, data=None):
config = self.data_processing_configurations[data['type']] #I get the right config for the datatype
with self.locks[data['type']]:
FetchDataBaseStuff(data['type'])
DoSomething(data, config)
WriteToDataBase(data['type'])
func1.__name__ = "Processing_for_{}".format(type)
self.Data_Processing_Functions[func1.__name__] = func1 #Add this function to the Dictinary object
bindFunction1(type)
#Then I add some methods to a daemon that are going to check if our Dataprocessors need to be called
def fast_process_types(data):
if not example_condition is True: return
if not data['type'] in self.data_processing_types: return #Check that we are doing something with this type of data
threading.Thread(target=self.Data_Processing_Functions["Processing_for_{}".format(data['type'])], args=(self,data)).start()
def slow_process_types(data):
if not some_other_condition is True: return
if not data['type'] in self.data_processing_types: return #Check that we are doing something with this type of data
threading.Thread(target=self.Data_Processing_Functions["Processing_for_{}".format(data['type'])], args=(self,data)).start()
addMethod(InfrequentDataDaemon)(slow_process_types)
addMethod(FrequentDataDaemon)(fast_process_types)