如何在Python的asyncio中使用Future对象?

2024-04-23 09:06:20 发布

您现在位置:Python中文网/ 问答频道 /正文

我理解了事件循环的基本思想。有一个中心循环监听一组文件描述符,如果它准备好读或写,则执行相应的回调。你知道吗

我们可以使用co例程而不是回调,因为它们可以暂停和恢复。但是,这意味着在co例程和事件循环之间应该有一些通信协议,以使事情正常工作?你知道吗

我用co例程编写了一个简单的Echo服务器,它将产生fd以及像这样的yield fd, 'read'yield fd, 'write'等感兴趣的操作,然后事件循环将相应地注册select。回调将只是恢复co例程。它工作得很好,我添加了下面的代码。你知道吗

现在我只是想了解await实际上是如何工作的。它似乎不像我的示例代码那样生成fds和相应的操作,相反,它提供了一个Future对象。那么引擎盖下到底发生了什么?它是如何与事件循环通信的?你知道吗

我猜await async.sleep(1)是这样执行的:

  1. 事件循环将执行co例程并到达async.sleep(1)。你知道吗
  2. 它将创建一个Future对象。你知道吗
  3. 然后,它将创建一个fd,可能使用带有回调的timerfd_create来完成Future。你知道吗
  4. 然后它将把它提交给事件循环进行监视。你知道吗
  5. await将把Future对象产生给执行它的事件循环。你知道吗
  6. 事件循环将Future对象的回调函数设置为只恢复协同程序。你知道吗

我的意思是我可以像这样利用Future。但这是真的吗?有人能帮我更好地理解这一点吗?你知道吗

PS:timerfd_create只是作为一个例子,因为我无法理解如何在事件循环中实现计时器。就这个问题而言,网络fds也将是fin。如果有人能帮助我如何实现定时器,那也太好了!你知道吗

下面是我使用co例程实现的一个简单Echo服务器:

"""
Tasks are just generators or coroutines
"""
import socket
import selectors

select = selectors.DefaultSelector()
tasks_to_complete = []

def create_server(port):
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    hostname = socket.gethostname()
    s.bind((hostname, port))
    s.listen(5)
    print("Starting server on hostname at port %s %s" % (hostname, port))
    return s

def handle_clients(s):
    while True:
        print("yielding for read on server %s" % id(s))
        yield (s, 'read')
        c, a = s.accept()
        t = handle_client(c)
        print("appending a client handler")
        tasks_to_complete.append(t)

def handle_client(c):
    while True:
        print("yielding for read client %s" % id(c))
        yield (c, 'read')
        data = c.recv(1024)
        if len(data) == 0:
            return "Connection Closed"
        print("yielding for write on client %s" % id(c))
        yield (c, 'write')
        c.send(bytes(data))

def run(tasks_to_complete):
    while True:
        while tasks_to_complete:
            t = tasks_to_complete.pop(0)
            try:
                fd, event = t.send(None)
                if event == 'read':
                    event = selectors.EVENT_READ
                elif event == 'write':
                    event = selectors.EVENT_WRITE
                def context_callback(fd, t):
                    def callback():
                        select.unregister(fd)
                        tasks_to_complete.append(t)
                    return callback
                select.register(fd, event, context_callback(fd, t))
            except StopIteration as e:
                print(e.value)
        events = select.select()
        for key, mask in events:
            callback = key.data
            callback()

tasks_to_complete.append(handle_clients(create_server(9000)))

run(tasks_to_complete)

Tags: toeventreaddefcallback事件futuresocket
1条回答
网友
1楼 · 发布于 2024-04-23 09:06:20

I wrote a simple Echo Server with co-routines, which would yield the fd along with the interested action like this yield fd, 'read', yield fd, 'write' etc and then the Event Loop would register the select accordingly.

这与Dave Beazley的curio的工作原理类似。要了解更多关于这个概念的信息,请参阅lecture,在这里他从最基本的内容构建了一个事件循环。(他使用pre-3.5 yield from语法,但其工作原理与await完全相同。)

正如您所发现的,asyncio的工作方式有点不同,尽管原理仍然相似。你知道吗

Now I am just trying to understand how await actually works. It doesn't seem to yield fds, and string corresponding to the action like the above example, instead it gives you a Future object. So what exactly happens under-the-hood? How is it communicating with the Event Loop?

简短的版本是阻塞协程使用全局变量(通过asyncio.get_event_loop())来获取事件循环。事件循环有一些方法,这些方法在发生有趣的事件时调度要调用的回调。asyncio.sleepcallsloop.call_later以确保超时结束时恢复。你知道吗

生成的Future只是一种方便的方法,事件循环一旦准备好就可以将结果通知给它,这样它就可以正确地恢复等待阻塞操作的Task(由事件循环驱动的协程),同时还可以处理异常和取消。见^{}了解the gory details。你知道吗

timerfd_create was just taken as an example because I couldn't understand how timers can be implemented in an Event Loop.

计时器的实现使得事件循环跟踪文件描述符和超时,以及earliest timeout结束时终止的issues a ^{}。上面链接的戴夫的演讲简洁地说明了这个概念。你知道吗

相关问题 更多 >