高并发系统的事件总线

geeteventbus的Python项目详细描述


用于并发编程的eventbus

geeteventbus是一个允许发布-订阅式通信的库。组件之间不需要相互注册。它的灵感来自于一个java库,来自google的guava eventbus。但它与番石榴eventbus图书馆并不完全相同。

  • GeetEventBus简化了来自发布者和订阅者的事件处理。
  • 发布服务器和订阅服务器不需要创建线程来并发处理事件。
  • eventbus可以是synchronus,其中事件是从发布事件的同一线程传递的
  • 事件可以按发布顺序发送给子卷
  • 订阅服务器可以声明为线程安全的,在这种情况下,可以同时调用同一订阅服务器来处理多个事件
  • 没有订阅服务器的事件将被注册,但事件总线只会对其进行讨论。
  • EventBus不用于进程间通信。发布者和订阅者必须在同一进程上运行

基本工作

  1. 我们创建一个eventbus

    fromgeeteventbus.eventbusimporteventbuseb=eventbus()

    这将创建具有默认值的eventbus。默认事件总线将具有以下特征:

    1. the maximum queued event limit is set to 10000
    2. number of executor thread is 8
    3. the subscribers will be called asynchronously
    4. subscibers are treated as thread-safe and hence same subscribers may be invoked simultaneously on different threads
  2. 创建订阅服务器的子类并重写进程方法。创建此类的对象并将其注册到EventBus以接收具有特定主题的消息:

    fromgeeteventbus.subscriberimportsubscriberfromgeeteventbus.eventbusimporteventbusfromgeeteventbus.eventimporteventclassmysubscriber(subscriber):defprocess(self,eventobj):ifnotisinstance(eventobj,event):print('Invalid object type is passed.')returntopic=eventobj.get_topic()data=eventobj.get_data()print('Processing event with TOPIC: %s, DATA: %s'%(topic,data))subscr=mysubscriber()eb.register_consumer(subscr,'an_important_topic')
  3. 在EventBus上发布一些主题为“重要主题”的事件。

    fromgeeteventbus.eventimporteventeobj1=('an_important_topic','This is some data for the event 1')eobj2=('an_important_topic','This is some data for the event 2')eobj3=('an_important_topic','This is some data for the event 3')eobj3=('an_important_topic','This is some data for the event 4')eb.post(eobj1)eb.post(eobj2)eb.post(eobj3)eb.post(eobj4)
  4. 在退出进程

    之前,我们可以优雅地关闭EnvestBUS。
    eb.shutdown()

完整的示例如下:

fromtimeimportsleepfromgeeteventbus.subscriberimportsubscriberfromgeeteventbus.eventbusimporteventbusfromgeeteventbus.eventimporteventclassmysubscriber(subscriber):defprocess(self,eventobj):ifnotisinstance(eventobj,event):print('Invalid object type is passed.')returntopic=eventobj.get_topic()data=eventobj.get_data()print('Processing event with TOPIC: %s, DATA: %s'%(topic,data))eb=eventbus()subscr=mysubscriber()eb.register_consumer(subscr,'an_important_topic')eobj1=event('an_important_topic','This is some data for the event 1')eobj2=event('an_important_topic','This is some data for the event 2')eobj3=event('an_important_topic','This is some data for the event 3')eobj4=event('an_important_topic','This is some data for the event 4')eb.post(eobj1)eb.post(eobj2)eb.post(eobj3)eb.post(eobj4)eb.shutdown()sleep(2)

下面给出了一个更详细的例子。订阅者(counter_aggregator)聚合 一组计数器。它将自己注册到EventBus以接收 计数器(主题)。一组生产者更新计数器和post事件的值,描述 事件总线的计数器:

fromthreadingimportLock,Threadfromtimeimportsleep,timefromgeeteventbus.eventbusimporteventbusfromgeeteventbus.eventimporteventfromgeeteventbus.subscriberimportsubscriberfromrandomimportrandintclasscounter_aggregator(subscriber,Thread):'''
    Aggregator for a set of counters. Multiple threads updates the counts which
    are aggregated by this class and output the aggregated value periodically.
    '''def__init__(self,counter_names):Thread.__init__(self)self.counter_names=counter_namesself.locks={}self.counts={}self.keep_running=Trueself.collect_times={}forcounterincounter_names:self.locks[counter]=Lock()self.counts[counter]=0self.collect_times[counter]=time()defprocess(self,eobj):'''
        Process method calls with the event object eobj. eobj has the counter name as the topic
        and an int count as the value for the counter.
        '''counter_name=eobj.get_topic()ifcounter_namenotinself.counter_names:returncount=eobj.get_data()withself.locks[counter_name]:self.counts[counter_name]+=countdefstop(self):self.keep_running=Falsedef__call__(self):'''
        Keep outputing the aggregated counts every 2 seconds
        '''whileself.keep_running:sleep(2)forcounter_nameinself.counter_names:withself.locks[counter_name]:print('Change for counter %s = %d, in last %f secs'%(counter_name,self.counts[counter_name],time()-self.collect_times[counter_name]))self.counts[counter_name]=0self.collect_times[counter_name]=time()print('Aggregator exited')classcount_producer:'''
    Producer for counters. Every 0.02 seconds post the "updated" value for a
    counter randomly
    '''def__init__(self,counters,ebus):self.counters=countersself.ebus=ebusself.keep_running=Trueself.num_counter=len(counters)defstop(self):self.keep_running=Falsedef__call__(self):whileself.keep_running:ev=event(self.counters[randint(0,self.num_counter-1)],randint(1,100))ebus.post(ev)sleep(0.02)print('producer exited')if__name__=='__main__':ebus=eventbus()counters=['c1','c2','c3','c4']subcr=counter_aggregator(counters)producer=count_producer(counters,ebus)forcounterincounters:ebus.register_consumer(subcr,counter)threads=[]i=30whilei>0:threads.append(Thread(target=producer))i-=1aggregator_thread=Thread(target=subcr)aggregator_thread.start()forthrdinthreads:thrd.start()sleep(20)producer.stop()subcr.stop()sleep(2)ebus.shutdown()

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
Android深度链接的java模式匹配   jstl在JSP中添加外部资源(CSS/JavaScript/images等)   Java开关环路中断故障   java Appengine通道API开发服务器vs生产   java断言等于Junit中的两个列表   java用“真实数据”建立测试系统   Java中使用番石榴BiMap的词典   java试图在圆周上绘制位图,在实现中找到一些偏移   json Java curl响应   java使用hibernate或JPA获取过程输出的列名   java从Android移动应用程序获取电话号码   java访问嵌套的JsonNode元素,并用逗号分隔   未使用注释的java未经检查或不安全操作   控制台中的java输出为空   java使用Android应用程序的自定义适配器将项目动态添加到列表视图   java如何解决对接口中静态方法的需求?   尝试从其他活动调用数组字符串时发生java错误   仅设备上的java Android NDK致命信号11(SIGSEGV)