有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java是发布者/订阅者模式的并发实现

我想用Java实现各种各样的发布者/订阅者模式,但目前已经没有想法了

有1个发布者和N个订阅者,发布者发布对象,然后每个订阅者需要以正确的顺序处理每个对象一次,并且只处理一次。发布者和每个订阅者在各自的线程中运行

在我最初的实现中,每个订阅者都有自己的阻塞队列,发布者将对象放入订阅者的每个队列中。这可以正常工作,但如果订阅服务器的任何队列已满,发布服务器将被阻止。这会导致性能下降,因为每个订阅者处理对象的时间不同

然后在另一个实现中,发布者将对象保存在自己的队列中。与该对象一起,一个原子整数计数器与该对象关联,并与那里的订阅者数量相关联。然后,每个订阅者窥视队列并减少计数器,当计数器达到零时,将其从队列中移除

通过这种方式,发布者可以免于阻塞,但现在订阅者需要等待彼此处理对象,从队列中移除对象,然后才能窥视下一个对象

有更好的方法吗?我想这应该是一种很常见的模式


共 (3) 个答案

  1. # 1 楼答案

    您的“多队列”实现就是一条出路。我认为你不必担心一个完整的队列阻塞了制作人,因为总的完成时间不会受到影响。假设有三个消费者,两个以每秒1次的速度消费,第三个以每5秒1次的速度消费,同时生产者以每2秒1次的速度生产。最后,第三个队列将被填满,因此制作人将阻止它,并停止将项目放入第一和第二个队列。有很多方法可以解决这个问题,但它们不会改变这样一个事实,即第三个消费者总是成为瓶颈。如果你生产/消费100件商品,那么这将至少需要500秒,因为第三个消费者(5秒乘以100件商品),即使第一个和第二个消费者在200秒后完成(因为你做了一些聪明的事情,允许生产者在第三个队列已满后继续填充他们的队列),或者他们在500秒后完成(因为生产者在第三个队列上阻塞),情况也是如此

  2. # 2 楼答案

    明确地

    each subscriber has its own blocking queue, and the publisher put objects into each of the subscriber's queue.`

    这就是路。 您可以使用线程方法将其放入队列中。。。因此,如果一个队列已满,publisher将不会等待

    例如

    s1 s2 s3是订阅者,addToQueue是每个订阅者中添加到响应队列的方法。 addQueue方法是等待队列非空。。所以对addQueue的调用将是一个阻塞调用ideally synchronised code

    然后在publisher中,您可以执行类似于以下代码的操作

    注意:代码可能未按原样处于工作状态。。但应该给你一个想法

    List<subscriber> slist;// Assume its initialised
    public void publish(final String message){
    
        for (final subscriber s: slist){
    
    
              Thread t=new Thread(new Runnable(){
                 public void run(){
                    s.addToQueue(message);
                 }
               });
    
          t.start();
        }
    
    }
    
  3. # 3 楼答案

    There is 1 publisher and N subscribers, the publisher publish objects then each subscriber need to process the each of the objects once and only once in the correct order. The publisher and each subscriber run in their own thread.

    我会改变这个架构。我最初考虑了每个订阅者的队列,但我不喜欢这种机制。例如,如果第一个订阅服务器的运行时间更长,那么所有作业都将在该队列中结束,而您将只执行一个线程的工作

    因为您必须按顺序运行订阅服务器,所以我有一个线程池,通过所有订阅服务器运行每条消息。对用户的呼叫将需要可重入,这可能是不可能的

    因此,您将有一个10个线程的池(比方说),每个线程从发布者的队列中退出,并执行如下操作:

    public void run() {
        while (!shutdown && !Thread.currentThread().isInterrupted()) {
            Article article = publisherQueue.take();
            for (Subscriber subscriber : subscriberList) {
               subscriber.process(article);
            }
        }
    }