从类运行时出现python nats连接错误

2024-05-17 18:49:18 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在尝试运行一个应用程序,该应用程序在继续之前等待特定的nats消息。因此,我创建了以下类,该类发送消息并侦听消息:

#!/usr/bin/python3
from nats.aio.client import Client as NATS
import asyncio


class NatsService:

    def __init__(self):
        self.NATSERVER = "XXXXX"
        self.CHANNEL = "msgchannel"
        self.CONNECTTIMEOUT = 1
        self.MAXRECONNECTATTEMPTS = 1
        self.RECONNECTTIMEWAIT = 1
        self.nc = NATS()

    async def send_message(self, message, channel=None):
        if not channel:
            channel = self.CHANNEL

        print("connecting to nats server")
        await self.nc.connect(self.NATSERVER, self.CONNECTTIMEOUT,
                              max_reconnect_attempts=self.MAXRECONNECTATTEMPTS,
                              reconnect_time_wait=self.RECONNECTTIMEWAIT)

        print(f"Publishing message: '{message}' to channel: {channel}")
        await self.nc.publish(channel, message.encode('utf-8'))
        print("message sent, closing connection")
        await self.nc.close()
        print("nats server connection closed")

    def start_listening(self):
        loop = asyncio.get_event_loop()
        try:
            loop.create_task(self.listener_loop(loop))
            loop.run_forever()
        finally:
            loop.close()

    async def listener_loop(self, loop):
        print("connecting to nats listener loop")
        await self.nc.connect(self.NATSERVER, loop=loop)

        async def message_handler(msg):
            subject = msg.subject
            data = msg.data.decode()
            print('Received a message on {}: {}'.format(subject, data))

            if eval(data.split(":::")[1]):
                print("message received, closing")
                await nc.drain()    # timeout occurs for some reason
                print("stopping loop")
                loop.stop()
            
        await self.nc.subscribe(self.CHANNEL, cb=msg_handler)

我在两个应用程序中导入这个类,一个用于发送消息,另一个用于侦听这些消息,直到收到正确的消息

我的主应用程序侦听消息,只有在收到正确消息后才继续

from nats_service import NatsService

try:
    print("starting nats service instance")
    ns = NatsService()
    print("listening for approved message")
    start_listening()
except Exception as e:
    print(f"Error: {e}")

print(f"Contiuing with application...")

另一个应用程序用于发送消息:

from nats_service import NatsService
import asyncio

async def main():
    ns = NatsService()
    message = "test"
    await ns.send_message(message)

if __name__=="__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
    print(f"Completed sender function.")

我能够发送和接收独立函数上的消息,这些函数是我在将它们组合在一起之前创建的。但是当从上面的类导入它们时,我似乎无法运行它们,尤其是在异步IO出现问题时

经过一些尝试和错误后,当我运行sender时,它最终似乎启动了,但立即失败,出现了一个错误,我不理解,也找不到有关以下内容的更多信息:

connecting to nats server
nats: encountered error
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/dist-packages/nats/aio/client.py", line 1185, in _select_next_server
    connection_future, self.options['connect_timeout']
  File "/usr/lib/python3.7/asyncio/tasks.py", line 416, in wait_for
    return fut.result()
  File "/usr/lib/python3.7/asyncio/streams.py", line 75, in open_connection
    protocol = StreamReaderProtocol(reader, loop=loop)
  File "/usr/lib/python3.7/asyncio/streams.py", line 227, in __init__
    self._closed = self._loop.create_future()
AttributeError: 'int' object has no attribute 'create_future'
Traceback (most recent call last):
  File "sender_side.py", line 13, in <module>
    loop.run_until_complete(main())
  File "/usr/lib/python3.7/asyncio/base_events.py", line 584, in run_until_complete
    return future.result()
  File "sender_side.py", line 9, in main
    await ns.send_message(message)
  File "/home/bot10-sigma/nats_tests/nats_service.py", line 23, in send_message
    reconnect_time_wait=self.RECONNECTTIMEWAIT)
  File "/usr/local/lib/python3.7/dist-packages/nats/aio/client.py", line 317, in connect
    await self._select_next_server()
  File "/usr/local/lib/python3.7/dist-packages/nats/aio/client.py", line 1174, in _select_next_server
    self.options["reconnect_time_wait"], loop=self._loop
  File "/usr/lib/python3.7/asyncio/tasks.py", line 563, in sleep
    future = loop.create_future()
AttributeError: 'int' object has no attribute 'create_future'
Exception ignored in: <function StreamReaderProtocol.__del__ at 0x761cacd8>
Traceback (most recent call last):
  File "/usr/lib/python3.7/asyncio/streams.py", line 271, in __del__
AttributeError: 'StreamReaderProtocol' object has no attribute '_closed'

Tags: inpyselfloopasyncio消息messagelib
1条回答
网友
1楼 · 发布于 2024-05-17 18:49:18

您正在将self.CONNECTTIMEOUT作为位置参数传递给Client.connect,在那里它需要对其io_loop参数进行引用。这就是为什么会得到一个AttributeError:一个int没有create_future属性。将超时作为connect_timeout=self.CONNECTTIMEOUT传递,这个问题应该会消失

相关问题 更多 >