将现有的基于Python `socket`的类与`asyncio`封装和集成

0 投票
1 回答
54 浏览
提问于 2025-04-14 15:24

我有一个场景,需要把很多TCP/IP客户端放到一个Python进程里。现在我想用asyncio来让程序可以同时处理多个任务。对于所有的TCP/IP客户端,我会使用asyncio,而对于所有新的HTTP客户端,我会使用aiohttp

不过,我还有一些已有的客户端是用Python的socket模块写的。我的问题是:应该怎么把这些基于socket的类和方法“包装”成async的形式呢?


下面是一个已有的TCP/IP客户端示例:

import socket

class ExistingClient:
    def __init__(self, host: str, port: int) -> None:
        self.__host = host
        self.__port = port
        self.__socket: socket.socket | None = None

    def initialize(self) -> None:
        self.__socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.__socket.connect((self.__host, self.__port))

    def get_status(self) -> int:
        self.__socket.send("2\n".encode())
        data: str = str(self.__socket.recv(1024).decode()).strip()
        return int(data)

    def close(self) -> None:
        self.__socket.close()

TCP/IP服务器示例

你可以运行这个服务器脚本,作为下面客户端脚本的示例。保存为server.py

import asyncio
import functools

class State:
    def __init__(self) -> None:
        self.__count: int = 0

    def add_to_count(self, count: int) -> None:
        self.__count = self.__count + count

    @property
    def count(self) -> int:
        return self.__count


async def handle_echo(
    reader: asyncio.StreamReader, writer: asyncio.StreamWriter, state: State
):
    # Ignore the use of the infinite while loop for this example.
    # Controlling the loop would be handled in a more sophisticated way.
    while True:
        data = await reader.readline()
        message: int = int(data.decode())
        state.add_to_count(message)

        addr = writer.get_extra_info("peername")

        writer.write(f"{state.count}\n".encode())
        await writer.drain()

    writer.close()
    await writer.wait_closed()


async def server(port: int):
    state = State()
    partial = functools.partial(handle_echo, state=state)
    server = await asyncio.start_server(partial, "127.0.0.1", port)

    address = ", ".join(str(sock.getsockname()) for sock in server.sockets)
    print(f"Serving on {address}")

    async with server:
        await server.serve_forever()


async def main():
    await asyncio.gather(
        server(8888),
        server(8889),
    )

asyncio.run(main())

可能的解决方案1

对我来说,一个解决方案是可以对已有的类进行修改,添加async方法,这些方法实际上会调用阻塞的I/O socket操作。比如:

import socket

class ExistingClient:
    def __init__(self, host: str, port: int) -> None:
        self.__host = host
        self.__port = port
        self.__socket: socket.socket | None = None

    def initialize(self) -> None:
        self.__socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.__socket.connect((self.__host, self.__port))

    async def async_initialize(self) -> None:
        self.initialize()

    def get_status(self) -> int:
        self.__socket.send("2\n".encode())
        data: str = str(self.__socket.recv(1024).decode()).strip()
        return int(data)

    async def async_get_status(self) -> int:
        return self.get_status()

    def close(self) -> None:
        self.__socket.close()

    async def async_close(self) -> None:
        self.close()

这样做是否能确保不阻塞asyncio的事件循环,使其行为和asyncio流调用非常相似?下面是我想象的如何将这样的代码与普通的asyncio代码结合(假设这个修改过的ExistingClientasync方法保存在同一个文件client.py中):

import asyncio

async def streams_tcp_client():
    reader, writer = await asyncio.open_connection("127.0.0.1", 8888)

    writer.write("1\n".encode())
    await writer.drain()

    data: bytes = await reader.readline()
    print(f"asyncio streams data: {data.decode().strip()}")

    writer.close()
    await writer.wait_closed()


async def existing_tcp_client1():
    existing_client = ExistingClient("127.0.0.1", 8889)
    await existing_client.async_initialize()

    data: int = await existing_client.async_get_status()
    print(f"Existing client data: {data}")

    await existing_client.async_close()


async def main():
    await asyncio.gather(
        streams_tcp_client(),
        existing_tcp_client1(),
    )

这样做是否能如预期那样,使得ExistingClientasync调用(包含阻塞的I/O调用)不会阻塞这个asyncio事件循环?

我运行了这段代码,结果正常,打印出了预期的数据。但不清楚如何测试事件循环是否按预期运行。


可能的解决方案2

我看到有人提到过asyncio.to_thread。文档中提到:

这个协程函数主要用于执行那些如果在主线程中运行会阻塞事件循环的I/O绑定函数/方法。

不过,这并没有解释清楚。而且也没有说明为什么简单地定义

async def async_blocking_io():
    blocking_io()

不足以避免阻塞事件循环。因为实际的阻塞I/O,比如TCP/IP的读写操作,应该不会阻塞事件循环的线程,对吧?

使用这个方法的方式应该是这样的:

async def existing_tcp_client2():
    existing_client = ExistingClient("127.0.0.1", 8889)
    await asyncio.to_thread(existing_client.initialize)

    data: int = await asyncio.to_thread(existing_client.get_status)
    print(f"Existing client data with to_thread: {data}")

    await asyncio.to_thread(existing_client.close)

如果我把main修改成这样,它就能运行:

async def main():
    await asyncio.gather(
        streams_tcp_client(),
        existing_tcp_client1(),
    )

最后的思考

定义async def async_<existing_method>方法和使用asyncio.to_thread方法有什么区别?asyncio.to_thread方法让我有点担心,因为每次调用都会在新线程中运行?这可能会对线程不安全的类造成问题,并且会因为不断创建新线程而增加开销。

还有其他解决这个问题的方法吗?

1 个回答

1

方案1 - 只是把同步函数包裹在一个async的外壳里 - 仍然会阻塞你的主进程。在一个TCP服务器/客户端中,你可以尝试通过重写底层函数来解决这个问题,这些函数负责从套接字发送/读取数据,把它们也改成异步的,这样就可以分块处理通信。但如果你不这样做,方案1就无法提供更多的并发性。

如果你简单地定义

async def async_blocking_io():
    blocking_io()

那么这会阻塞进程。即使你加上一个正式的await语句,比如这样:

async def async_blocking_io():
    await asyncio.sleep(0) # first gives other tasks a chance
    blocking_io()

现在其他的协程可能会先运行,但你在同步的blocking_io()调用中仍然会被阻塞,因为底层的TCP读写操作也是同步的,并且会阻塞(如果你把套接字设置为非阻塞模式,你还需要使用阻塞的选择调用来知道数据是否可用)。

asyncio.to_thread函数确实是在一个单独的线程中运行,因为这是防止阻塞的唯一方法(当使用简单的高层封装时)。所以,如果你频繁调用这个函数,确实会产生很多额外的开销,这可能会抵消使用asyncio的目的。

没有简单的解决办法。你要么选择asyncio.to_thread的便利性(虽然有开销),要么就彻底重写所有代码,包括现有的客户端,使用asyncio.streams,或者你可以半途而废,把现有的高层同步代码改成异步(但这只有在你也能把它们最底层的函数——与套接字的通信——改成异步时才有帮助)。

撰写回答