在asyncio协议/服务器间通信

3 投票
1 回答
2592 浏览
提问于 2025-04-19 09:42

我正在尝试写一个服务器端事件(Server Side Events)服务器,这样我就可以用telnet连接到它,并把telnet的内容推送到浏览器上。使用Python和asyncio的想法是尽量减少CPU的使用,因为这个服务器会在树莓派上运行。

到目前为止,我有以下代码,它使用了一个库,链接在这里:https://pypi.python.org/pypi/asyncio-sse/0.1,这个库是基于asyncio的。

我还复制了一个同样使用asyncio的telnet服务器。

这两个部分各自都能正常工作,但我不知道怎么把它们结合在一起。根据我的理解,我需要在Telnet.data_received里面调用SSEHandler类中的send()方法,但我不知道怎么访问它。两个“服务器”都需要在一个循环中运行,以接受新的连接或推送数据。

有没有人能帮忙,或者给我指个方向?

import asyncio
import sse

# Get an instance of the asyncio event loop
loop = asyncio.get_event_loop()

# Setup SSE address and port
sse_host, sse_port = '192.168.2.25', 8888

class Telnet(asyncio.Protocol):
    def connection_made(self, transport):
        print("Connection received!");
        self.transport = transport

    def data_received(self, data):
        print(data)
        self.transport.write(b'echo:')
        self.transport.write(data)

        # This is where I want to send data via SSE
        # SSEHandler.send(data)

        # Things I've tried :(
        #loop.call_soon_threadsafe(SSEHandler.handle_request());
        #loop.call_soon_threadsafe(sse_server.send("PAH!"));

    def connection_lost(self, esc):
        print("Connection lost!")
        telnet_server.close()

class SSEHandler(sse.Handler):
    @asyncio.coroutine
    def handle_request(self):
        self.send('Working')

# SSE server
sse_server = sse.serve(SSEHandler, sse_host, sse_port)

# Telnet server
telnet_server = loop.run_until_complete(loop.create_server(Telnet, '192.168.2.25', 7777))

#telnet_server.something = sse_server;

loop.run_until_complete(sse_server)
loop.run_until_complete(telnet_server.wait_closed())

1 个回答

7

服务器端事件是一种http协议;在任何时刻,你可能会有多个http请求同时进行。如果没有人连接,可能会有零个请求,或者有很多个。这个细节体现在两个东西上:sse.servesse.Handler。前者代表一个单独的监听端口,它会把每个不同的客户端请求分发给后者。

另外,sse.Handler.handle_request()会为每个客户端调用一次,当这个协程结束时,客户端就会断开连接。在你的代码中,这个协程会立即结束,所以客户端只会看到一个“正在工作”的事件。因此,我们需要等待,几乎是永远的等待。我们可以通过yield from一个asyncio.Future()来实现。

第二个问题是,我们需要以某种方式获取所有独立的SSEHandler()实例,并在每个实例上使用send()方法。我们可以让每个实例在它们的handle_request()方法中自我注册;通过将每个实例添加到一个字典中,这个字典将每个处理器实例与它们正在等待的未来关联起来。

class SSEHandler(sse.Handler):
    _instances = {}

    @asyncio.coroutine
    def handle_request(self):
        self.send('Working')
        my_future = asyncio.Future()
        SSEHandler._instances[self] = my_future
        yield from my_future

现在,要向每个正在监听的实例发送事件,我们只需访问字典中注册的所有SSEHandler实例,并对每个实例使用send()

class SSEHandler(sse.Handler):

    #...

    @classmethod
    def broadcast(cls, message):
        for instance, future in cls._instances.items():
            instance.send(message)

class Telnet(asyncio.Protocol):

    #...

    def data_received(self, data):
        #...
        SSEHandler.broadcast(data.decode('ascii'))

最后,当telnet连接关闭时,你的代码会退出。这没问题,但我们也应该在那个时候进行清理。幸运的是,这只需要在所有处理器的所有未来上设置一个结果。

class SSEHandler(sse.Handler):

    #...

    @classmethod
    def abort(cls):
        for instance, future in cls._instances.items():
            future.set_result(None)
        cls._instances = {}

class Telnet(asyncio.Protocol):

    #...

    def connection_lost(self, esc):
        print("Connection lost!")
        SSEHandler.abort()
        telnet_server.close()

这里有一个完整的、可工作的示例,以防我的说明不够清楚。

import asyncio
import sse

loop = asyncio.get_event_loop()
sse_host, sse_port = '0.0.0.0', 8888

class Telnet(asyncio.Protocol):
    def connection_made(self, transport):
        print("Connection received!");
        self.transport = transport

    def data_received(self, data):
        SSEHandler.broadcast(data.decode('ascii'))

    def connection_lost(self, esc):
        print("Connection lost!")
        SSEHandler.abort()
        telnet_server.close()

class SSEHandler(sse.Handler):
    _instances = {}
    @classmethod
    def broadcast(cls, message):
        for instance, future in cls._instances.items():
            instance.send(message)

    @classmethod
    def abort(cls):
        for instance, future in cls._instances.items():
            future.set_result(None)
        cls._instances = {}

    @asyncio.coroutine
    def handle_request(self):
        self.send('Working')
        my_future = asyncio.Future()
        SSEHandler._instances[self] = my_future
        yield from my_future

sse_server = sse.serve(SSEHandler, sse_host, sse_port)
telnet_server = loop.run_until_complete(loop.create_server(Telnet, '0.0.0.0', 7777))
loop.run_until_complete(sse_server)
loop.run_until_complete(telnet_server.wait_closed())

撰写回答