nats.io的Python客户端关闭时不会刷新

2024-05-17 14:27:05 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个leetle测试程序,除非我在client.close()之前做一个client.flush(),否则它不会工作。对nats/aio/client.py的天真解读似乎表明close()做了某种{},所以我不明白为什么我的测试程序失败了。有必要总是flush()close()之前吗?示例程序似乎并没有说明这一点。在

我可以看到close()调用_flush_pending(),但这与flush()有很大不同:

  • flush()调用_send_ping()发送PING并等待PONG响应。_send_ping()直接写入self._io_writer,然后调用{}。

  • _flush_pending()将一个None(我想任何东西都可以)推到self._flush_queue。这可能会唤醒_flusher(),并使其将self._pending中的所有内容写入self._io_writer

  • publish()调用_send_command()将消息推送到self._pending,然后还调用_flush_pending()使{}写入所有内容。

测试程序:

#!/usr/bin/env python3.5

import asyncio
import nats.aio.client
import nats.aio.errors

async def send_message(loop):
    mq_url = "nats://nats:password@127.0.0.1:4222"
    client = nats.aio.client.Client()
    await client.connect(io_loop=loop, servers=[mq_url])
    await client.publish("test_subject", "test1".encode())
    #await client.flush()
    await client.close()

def main():
    loop = asyncio.get_event_loop()
    loop.run_until_complete(send_message(loop))
    loop.close()

if __name__ == '__main__':
    main()

FWIW如果我发送了大量的消息,那么在某个时刻(还没有计算出确切的条件),就会发送消息。我们注意到了在处理一组文件并为文件中的每一行发送一条消息时的行为:一些文件正在通过,另一些则没有,结果是更大的文件(更多的行)使它成功。所以看起来好像有一些内部缓冲区被填满了,这会强制刷新。在


Tags: 文件ioimportselfclientloopsend消息