我有一个leetle测试程序,除非我在client.close()
之前做一个client.flush()
,否则它不会工作。对nats/aio/client.py的天真解读似乎表明close()
做了某种{flush()
在close()
之前吗?示例程序似乎并没有说明这一点。在
我可以看到close()
调用_flush_pending()
,但这与flush()
有很大不同:
flush()
调用_send_ping()
发送PING
并等待PONG
响应。_send_ping()
直接写入self._io_writer
,然后调用{
_flush_pending()
将一个None(我想任何东西都可以)推到self._flush_queue
。这可能会唤醒_flusher()
,并使其将self._pending
中的所有内容写入self._io_writer
。
publish()
调用_send_command()
将消息推送到self._pending
,然后还调用_flush_pending()
使{
测试程序:
#!/usr/bin/env python3.5
import asyncio
import nats.aio.client
import nats.aio.errors
async def send_message(loop):
mq_url = "nats://nats:password@127.0.0.1:4222"
client = nats.aio.client.Client()
await client.connect(io_loop=loop, servers=[mq_url])
await client.publish("test_subject", "test1".encode())
#await client.flush()
await client.close()
def main():
loop = asyncio.get_event_loop()
loop.run_until_complete(send_message(loop))
loop.close()
if __name__ == '__main__':
main()
FWIW如果我发送了大量的消息,那么在某个时刻(还没有计算出确切的条件),就会发送消息。我们注意到了在处理一组文件并为文件中的每一行发送一条消息时的行为:一些文件正在通过,另一些则没有,结果是更大的文件(更多的行)使它成功。所以看起来好像有一些内部缓冲区被填满了,这会强制刷新。在
看起来可能是一个刚刚修复的错误:https://github.com/nats-io/asyncio-nats/pull/35
相关问题 更多 >
编程相关推荐