java是发布者/订阅者模式的并发实现
我想用Java实现各种各样的发布者/订阅者模式,但目前已经没有想法了
有1个发布者和N个订阅者,发布者发布对象,然后每个订阅者需要以正确的顺序处理每个对象一次,并且只处理一次。发布者和每个订阅者在各自的线程中运行
在我最初的实现中,每个订阅者都有自己的阻塞队列,发布者将对象放入订阅者的每个队列中。这可以正常工作,但如果订阅服务器的任何队列已满,发布服务器将被阻止。这会导致性能下降,因为每个订阅者处理对象的时间不同
然后在另一个实现中,发布者将对象保存在自己的队列中。与该对象一起,一个原子整数计数器与该对象关联,并与那里的订阅者数量相关联。然后,每个订阅者窥视队列并减少计数器,当计数器达到零时,将其从队列中移除
通过这种方式,发布者可以免于阻塞,但现在订阅者需要等待彼此处理对象,从队列中移除对象,然后才能窥视下一个对象
有更好的方法吗?我想这应该是一种很常见的模式
# 1 楼答案
您的“多队列”实现就是一条出路。我认为你不必担心一个完整的队列阻塞了制作人,因为总的完成时间不会受到影响。假设有三个消费者,两个以每秒1次的速度消费,第三个以每5秒1次的速度消费,同时生产者以每2秒1次的速度生产。最后,第三个队列将被填满,因此制作人将阻止它,并停止将项目放入第一和第二个队列。有很多方法可以解决这个问题,但它们不会改变这样一个事实,即第三个消费者总是成为瓶颈。如果你生产/消费100件商品,那么这将至少需要500秒,因为第三个消费者(5秒乘以100件商品),即使第一个和第二个消费者在200秒后完成(因为你做了一些聪明的事情,允许生产者在第三个队列已满后继续填充他们的队列),或者他们在500秒后完成(因为生产者在第三个队列上阻塞),情况也是如此
# 2 楼答案
明确地
这就是路。 您可以使用线程方法将其放入队列中。。。因此,如果一个队列已满,publisher将不会等待
例如
s1 s2 s3是订阅者,
addToQueue
是每个订阅者中添加到响应队列的方法。addQueue
方法是等待队列非空。。所以对addQueue
的调用将是一个阻塞调用ideally synchronised code
然后在publisher中,您可以执行类似于以下代码的操作
注意:代码可能未按原样处于工作状态。。但应该给你一个想法
# 3 楼答案
我会改变这个架构。我最初考虑了每个订阅者的队列,但我不喜欢这种机制。例如,如果第一个订阅服务器的运行时间更长,那么所有作业都将在该队列中结束,而您将只执行一个线程的工作
因为您必须按顺序运行订阅服务器,所以我有一个线程池,通过所有订阅服务器运行每条消息。对用户的呼叫将需要可重入,这可能是不可能的
因此,您将有一个10个线程的池(比方说),每个线程从发布者的队列中退出,并执行如下操作: