如果消息数量少于线程数量,该如何处理?

0 投票
0 回答
28 浏览
提问于 2025-04-12 07:36

抱歉我的英语不好。

在使用多线程处理RabbitMQ消息队列中的消息时,如果消息的数量少于线程的数量,应该怎么处理呢?

我的代码最多可以使用5个线程来处理。如果消息队列中有13个网址,程序运行结束时会有3个网址没有被处理。如果消息队列中有12个网址,那么会有2个网址没有被处理。

我对RabbitMQ不太了解,但我猜这个问题可能和RabbitMQ代码的逻辑有关。

async def processTask(message: aio_pika.abc.AbstractIncomingMessage):
    try:        
        messagesList.append(message)
        # when message number =5
        if len(messagesList) == 5:
            messages_to_process = messagesList[:]  
            
            max_workers = 5 if len(messages_to_process) >= 5 else len(messages_to_process)
            with ThreadPoolExecutor(max_workers=max_workers) as pool:
                futures = {pool.submit(pageCollect, messageItem): messageItem for messageItem in
                           messages_to_process}
                for future in as_completed(futures):
                    messageItem = futures[future]
                    try:
                        resultDict = future.result()
                        if resultDict:
                            await messageItem.ack()
                            
                            logger.info(f'{messageItem.delivery_tag}: {resultDict}')
                        else:
                            await messageItem.reject(requeue=True)
                    except Exception as e:
                        logger.error(f'{messageItem.delivery_tag}: {e}')
                        await messageItem.reject(requeue=True)
            linkList.clear()
            messagesList.clear()

        # when message number < 5
        remaining_messages = queue.declaration_result.message_count
        if remaining_messages > 0:
            messages_to_process = messagesList[:]
            max_workers = len(messages_to_process)
            with ThreadPoolExecutor(max_workers=max_workers) as pool:
                futures = {pool.submit(pageCollect, messageItem): messageItem for messageItem in messagesList}
                for future in as_completed(futures):
                    messageItem = messages_to_process.pop(0)
                    try:
                        resultDict = future.result()
                        if resultDict:
                            await messageItem.ack()
                            logger.info(f'{messageItem.delivery_tag}: {resultDict}')
                        else:
                            await messageItem.reject(requeue=True)
                    except Exception as e:
                        logger.error(f'{messageItem.delivery_tag}: {e}')
                        await messageItem.reject(requeue=True)
            messagesList.clear()

        await asyncio.sleep(0.1)

    except Exception as e:
        print(e)


async def main(loop):
    try:
        # connect
        connection = await aio_pika.connect_robust(host='XX.XX.X.XX', port=5672, login='admin', password='admin',
                                                   virtualhost='my_vhost', loop=loop)
        
        global channel
        channel = await connection.channel()
        # Will take no more than 10 messages in advance
        await channel.set_qos() #prefetch_count=5
        crawler_exchange = await channel.declare_exchange(name='crawler_exchange', type='fanout')
        
        queueName = "myqueue"
        global queue
        queue = await channel.declare_queue(queueName, durable=True)
        await queue.bind(crawler_exchange, routing_key="myqueue")

        
        rstqueueName = "Result"
        global resultQuequ
        resultQuequ = await channel.declare_queue(rstqueueName, durable=True)
        
        global resultExchange
        resultExchange = await channel.declare_exchange(name='resultExchange', type='direct')
        
        await resultQuequ.bind(resultExchange, routing_key="allResult")

        # get message
        await queue.consume(processTask)
        logger.info(f"Waiting for messages at {queue.name}. To exit press CTRL+C")
        return connection

    except Exception as e:
        logger.error(f"failed: {e}")
        logger.error(traceback.format_exc())


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    connection = loop.run_until_complete(main(loop))

    try:
        loop.run_forever()
    except KeyboardInterrupt:
        logger.info("Received exit signal")
    finally:
        loop.run_until_complete(connection.close())
        loop.close()

我尝试在gpt4和claude3的帮助下修改我的代码,但他们的代码运行得不太好。我该怎么解决这个问题呢?谢谢。

我需要的是:当消息队列中的消息数量大于等于5时,应该有5个工作线程。当消息数量少于5时,工作线程的数量应该等于消息的数量。这样可以提高处理速度,并确保消息队列中没有未处理的网址。

0 个回答

暂无回答

撰写回答