如果消息数量少于线程数量,该如何处理?
抱歉我的英语不好。
在使用多线程处理RabbitMQ消息队列中的消息时,如果消息的数量少于线程的数量,应该怎么处理呢?
我的代码最多可以使用5个线程来处理。如果消息队列中有13个网址,程序运行结束时会有3个网址没有被处理。如果消息队列中有12个网址,那么会有2个网址没有被处理。
我对RabbitMQ不太了解,但我猜这个问题可能和RabbitMQ代码的逻辑有关。
async def processTask(message: aio_pika.abc.AbstractIncomingMessage):
try:
messagesList.append(message)
# when message number =5
if len(messagesList) == 5:
messages_to_process = messagesList[:]
max_workers = 5 if len(messages_to_process) >= 5 else len(messages_to_process)
with ThreadPoolExecutor(max_workers=max_workers) as pool:
futures = {pool.submit(pageCollect, messageItem): messageItem for messageItem in
messages_to_process}
for future in as_completed(futures):
messageItem = futures[future]
try:
resultDict = future.result()
if resultDict:
await messageItem.ack()
logger.info(f'{messageItem.delivery_tag}: {resultDict}')
else:
await messageItem.reject(requeue=True)
except Exception as e:
logger.error(f'{messageItem.delivery_tag}: {e}')
await messageItem.reject(requeue=True)
linkList.clear()
messagesList.clear()
# when message number < 5
remaining_messages = queue.declaration_result.message_count
if remaining_messages > 0:
messages_to_process = messagesList[:]
max_workers = len(messages_to_process)
with ThreadPoolExecutor(max_workers=max_workers) as pool:
futures = {pool.submit(pageCollect, messageItem): messageItem for messageItem in messagesList}
for future in as_completed(futures):
messageItem = messages_to_process.pop(0)
try:
resultDict = future.result()
if resultDict:
await messageItem.ack()
logger.info(f'{messageItem.delivery_tag}: {resultDict}')
else:
await messageItem.reject(requeue=True)
except Exception as e:
logger.error(f'{messageItem.delivery_tag}: {e}')
await messageItem.reject(requeue=True)
messagesList.clear()
await asyncio.sleep(0.1)
except Exception as e:
print(e)
async def main(loop):
try:
# connect
connection = await aio_pika.connect_robust(host='XX.XX.X.XX', port=5672, login='admin', password='admin',
virtualhost='my_vhost', loop=loop)
global channel
channel = await connection.channel()
# Will take no more than 10 messages in advance
await channel.set_qos() #prefetch_count=5
crawler_exchange = await channel.declare_exchange(name='crawler_exchange', type='fanout')
queueName = "myqueue"
global queue
queue = await channel.declare_queue(queueName, durable=True)
await queue.bind(crawler_exchange, routing_key="myqueue")
rstqueueName = "Result"
global resultQuequ
resultQuequ = await channel.declare_queue(rstqueueName, durable=True)
global resultExchange
resultExchange = await channel.declare_exchange(name='resultExchange', type='direct')
await resultQuequ.bind(resultExchange, routing_key="allResult")
# get message
await queue.consume(processTask)
logger.info(f"Waiting for messages at {queue.name}. To exit press CTRL+C")
return connection
except Exception as e:
logger.error(f"failed: {e}")
logger.error(traceback.format_exc())
if __name__ == '__main__':
loop = asyncio.get_event_loop()
connection = loop.run_until_complete(main(loop))
try:
loop.run_forever()
except KeyboardInterrupt:
logger.info("Received exit signal")
finally:
loop.run_until_complete(connection.close())
loop.close()
我尝试在gpt4和claude3的帮助下修改我的代码,但他们的代码运行得不太好。我该怎么解决这个问题呢?谢谢。
我需要的是:当消息队列中的消息数量大于等于5时,应该有5个工作线程。当消息数量少于5时,工作线程的数量应该等于消息的数量。这样可以提高处理速度,并确保消息队列中没有未处理的网址。
0 个回答
暂无回答