我想检查是否存在一个消费者/工作者来消费我即将发送的消息。
如果没有任何Worker,我将启动一些Worker(消费者和发布者都在一台机器上),然后开始发布消息。
如果有一个类似connection.check_if_has_consumers
的函数,我会像这样实现它-
import pika
import workers
# code for publishing to worker queue
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
# if there are no consumers running (would be nice to have such a function)
if not connection.check_if_has_consumers(queue="worker_queue", exchange=""):
# start the workers in other processes, using python's `multiprocessing`
workers.start_workers()
# now, publish with no fear of your queues getting filled up
channel.queue_declare(queue="worker_queue", auto_delete=False, durable=True)
channel.basic_publish(exchange="", routing_key="worker_queue", body="rockin",
properties=pika.BasicProperties(delivery_mode=2))
connection.close()
但我在pika中找不到任何具有check_if_has_consumers
功能的函数。
有没有什么方法可以实现这一点,使用pika?或者,直接和兔子说话?
我不完全确定,但我真的认为RabbitMQ会知道订阅不同队列的消费者的数量,因为它确实向他们发送消息并接受acks
我3小时前才开始学拉比。。。欢迎任何帮助。。。
这是我写的workers.py代码,如果有帮助的话。。。。
import multiprocessing
import pika
def start_workers(num=3):
"""start workers as non-daemon processes"""
for i in xrange(num):
process = WorkerProcess()
process.start()
class WorkerProcess(multiprocessing.Process):
"""
worker process that waits infinitly for task msgs and calls
the `callback` whenever it gets a msg
"""
def __init__(self):
multiprocessing.Process.__init__(self)
self.stop_working = multiprocessing.Event()
def run(self):
"""
worker method, open a channel through a pika connection and
start consuming
"""
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost')
)
channel = connection.channel()
channel.queue_declare(queue='worker_queue', auto_delete=False,
durable=True)
# don't give work to one worker guy until he's finished
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback, queue='worker_queue')
# do what `channel.start_consuming()` does but with stopping signal
while len(channel._consumers) and not self.stop_working.is_set():
channel.transport.connection.process_data_events()
channel.stop_consuming()
connection.close()
return 0
def signal_exit(self):
"""exit when finished with current loop"""
self.stop_working.set()
def exit(self):
"""exit worker, blocks until worker is finished and dead"""
self.signal_exit()
while self.is_alive(): # checking `is_alive()` on zombies kills them
time.sleep(1)
def kill(self):
"""kill now! should not use this, might create problems"""
self.terminate()
self.join()
def callback(channel, method, properties, body):
"""pika basic consume callback"""
print 'GOT:', body
# do some heavy lifting here
result = save_to_database(body)
print 'DONE:', result
channel.basic_ack(delivery_tag=method.delivery_tag)
编辑:
我必须向前走,所以这里有一个解决办法,除非有更好的办法
因此,RabbitMQ有这些HTTP management apis,它们在打开management plugin之后工作,在httpapi页面的中间有
/api/connections - A list of all open connections.
/api/connections/name - An individual connection. DELETEing it will close the connection.
因此,如果我通过不同的连接名/用户连接我的Workers和我的Produces,我将能够检查Worker连接是否打开。。。(当工人死亡时可能会有问题…)
将等待更好的解决方案。。。
编辑:
刚刚在rabbitmq文档中找到了这个,但是在python中这样做是很麻烦的:
shobhit@oracle:~$ sudo rabbitmqctl -p vhostname list_queues name consumers
Listing queues ...
worker_queue 0
...done.
所以我可以做些什么
subprocess.call("echo password|sudo -S rabbitmqctl -p vhostname list_queues name consumers | grep 'worker_queue'")
黑。。。仍然希望pika有一些python函数来实现这一点。。。
谢谢
我也在调查这个。在阅读了源代码和文档之后,我在channel.py中发现了以下内容:
我自己的测试是成功的。我使用了以下通道对象,其中我的通道对象是self
实际上,我是在意外中发现这一点的,寻找不同的问题,但有一件事可以帮助您,那就是基本的发布函数,有一个默认为False的参数“Immediate”。
您可以做的一个想法是将Immediate标志设置为True,这将要求消费者立即使用它,而不是坐在队列中。如果一个工作进程不可使用该消息,它将返回一个错误,告诉您启动另一个工作进程。
根据系统的吞吐量,这可能会产生大量额外的工作线程,或者产生工作线程来替换已死亡的工作线程。对于前一个问题,您可以编写一个类似于管理的系统,该系统只需通过控制队列跟踪工作进程,在该系统中,您可以告诉类似于“运行者”的进程终止现在不再需要的工作进程的进程。
相关问题 更多 >
编程相关推荐