高并发系统的事件总线
geeteventbus的Python项目详细描述
用于并发编程的eventbus
geeteventbus是一个允许发布-订阅式通信的库。组件之间不需要相互注册。它的灵感来自于一个java库,来自google的guava eventbus。但它与番石榴eventbus图书馆并不完全相同。
- GeetEventBus简化了来自发布者和订阅者的事件处理。
- 发布服务器和订阅服务器不需要创建线程来并发处理事件。
- eventbus可以是synchronus,其中事件是从发布事件的同一线程传递的
- 事件可以按发布顺序发送给子卷
- 订阅服务器可以声明为线程安全的,在这种情况下,可以同时调用同一订阅服务器来处理多个事件
- 没有订阅服务器的事件将被注册,但事件总线只会对其进行讨论。
- EventBus不用于进程间通信。发布者和订阅者必须在同一进程上运行
基本工作
我们创建一个eventbus
fromgeeteventbus.eventbusimporteventbuseb=eventbus()
这将创建具有默认值的eventbus。默认事件总线将具有以下特征:
- the maximum queued event limit is set to 10000
- number of executor thread is 8
- the subscribers will be called asynchronously
- subscibers are treated as thread-safe and hence same subscribers may be invoked simultaneously on different threads
创建订阅服务器的子类并重写进程方法。创建此类的对象并将其注册到EventBus以接收具有特定主题的消息:
fromgeeteventbus.subscriberimportsubscriberfromgeeteventbus.eventbusimporteventbusfromgeeteventbus.eventimporteventclassmysubscriber(subscriber):defprocess(self,eventobj):ifnotisinstance(eventobj,event):print('Invalid object type is passed.')returntopic=eventobj.get_topic()data=eventobj.get_data()print('Processing event with TOPIC: %s, DATA: %s'%(topic,data))subscr=mysubscriber()eb.register_consumer(subscr,'an_important_topic')
在EventBus上发布一些主题为“重要主题”的事件。
fromgeeteventbus.eventimporteventeobj1=('an_important_topic','This is some data for the event 1')eobj2=('an_important_topic','This is some data for the event 2')eobj3=('an_important_topic','This is some data for the event 3')eobj3=('an_important_topic','This is some data for the event 4')eb.post(eobj1)eb.post(eobj2)eb.post(eobj3)eb.post(eobj4)
在退出进程之前,我们可以优雅地关闭EnvestBUS。
eb.shutdown()
完整的示例如下:
fromtimeimportsleepfromgeeteventbus.subscriberimportsubscriberfromgeeteventbus.eventbusimporteventbusfromgeeteventbus.eventimporteventclassmysubscriber(subscriber):defprocess(self,eventobj):ifnotisinstance(eventobj,event):print('Invalid object type is passed.')returntopic=eventobj.get_topic()data=eventobj.get_data()print('Processing event with TOPIC: %s, DATA: %s'%(topic,data))eb=eventbus()subscr=mysubscriber()eb.register_consumer(subscr,'an_important_topic')eobj1=event('an_important_topic','This is some data for the event 1')eobj2=event('an_important_topic','This is some data for the event 2')eobj3=event('an_important_topic','This is some data for the event 3')eobj4=event('an_important_topic','This is some data for the event 4')eb.post(eobj1)eb.post(eobj2)eb.post(eobj3)eb.post(eobj4)eb.shutdown()sleep(2)
下面给出了一个更详细的例子。订阅者(counter_aggregator)聚合 一组计数器。它将自己注册到EventBus以接收 计数器(主题)。一组生产者更新计数器和post事件的值,描述 事件总线的计数器:
fromthreadingimportLock,Threadfromtimeimportsleep,timefromgeeteventbus.eventbusimporteventbusfromgeeteventbus.eventimporteventfromgeeteventbus.subscriberimportsubscriberfromrandomimportrandintclasscounter_aggregator(subscriber,Thread):''' Aggregator for a set of counters. Multiple threads updates the counts which are aggregated by this class and output the aggregated value periodically. '''def__init__(self,counter_names):Thread.__init__(self)self.counter_names=counter_namesself.locks={}self.counts={}self.keep_running=Trueself.collect_times={}forcounterincounter_names:self.locks[counter]=Lock()self.counts[counter]=0self.collect_times[counter]=time()defprocess(self,eobj):''' Process method calls with the event object eobj. eobj has the counter name as the topic and an int count as the value for the counter. '''counter_name=eobj.get_topic()ifcounter_namenotinself.counter_names:returncount=eobj.get_data()withself.locks[counter_name]:self.counts[counter_name]+=countdefstop(self):self.keep_running=Falsedef__call__(self):''' Keep outputing the aggregated counts every 2 seconds '''whileself.keep_running:sleep(2)forcounter_nameinself.counter_names:withself.locks[counter_name]:print('Change for counter %s = %d, in last %f secs'%(counter_name,self.counts[counter_name],time()-self.collect_times[counter_name]))self.counts[counter_name]=0self.collect_times[counter_name]=time()print('Aggregator exited')classcount_producer:''' Producer for counters. Every 0.02 seconds post the "updated" value for a counter randomly '''def__init__(self,counters,ebus):self.counters=countersself.ebus=ebusself.keep_running=Trueself.num_counter=len(counters)defstop(self):self.keep_running=Falsedef__call__(self):whileself.keep_running:ev=event(self.counters[randint(0,self.num_counter-1)],randint(1,100))ebus.post(ev)sleep(0.02)print('producer exited')if__name__=='__main__':ebus=eventbus()counters=['c1','c2','c3','c4']subcr=counter_aggregator(counters)producer=count_producer(counters,ebus)forcounterincounters:ebus.register_consumer(subcr,counter)threads=[]i=30whilei>0:threads.append(Thread(target=producer))i-=1aggregator_thread=Thread(target=subcr)aggregator_thread.start()forthrdinthreads:thrd.start()sleep(20)producer.stop()subcr.stop()sleep(2)ebus.shutdown()