将异步iterable转换为同步iterable的内置方法

2024-05-16 12:38:51 发布

您现在位置:Python中文网/ 问答频道 /正文

Python3.6现在asynchronous iterables。是否有将异步iterable转换为同步iterable的内置方法。在

但我觉得它现在很有帮助。有更好的方法吗?在

async def aiter_to_list(aiter):
    l = []
    async for i in aiter:
        l.append(i)
    return l

Tags: to方法inforasyncreturndefiterable
3条回答

这些函数允许您从/到iterable<;==>;async iterable,而不仅仅是简单的列表。

基本进口

import asyncio
import threading
import time

DONE = object()
TIMEOUT = 0.001

函数to_sync_iterable将把任何异步iterable转换为sync iterable:

^{pr2}$

你可以这样使用它:

async def slow_async_generator():
    yield 0

    await asyncio.sleep(1)
    yield 1

    await asyncio.sleep(1)
    yield 2

    await asyncio.sleep(1)
    yield 3


for x in to_sync_iterable(slow_async_generator()):
    print(x)

函数to_async_iterable将把任何sync iterable转换为async iterable:

def to_async_iterable(iterable, maxsize = 0):

    async def async_iterable():
        queue = asyncio.Queue(maxsize=maxsize)
        loop = asyncio.get_event_loop()
        task = loop.run_in_executor(None, lambda: _consume_iterable(loop, iterable, queue))

        while True:
            x = await queue.get()

            if x is DONE:
                break
            else:
                yield x

        await task


    return async_iterable()

def _consume_iterable(loop, iterable, queue):

    for x in iterable:
        while True:
            if not queue.full():
                loop.call_soon_threadsafe(queue.put_nowait, x)
                break
            else:
                time.sleep(TIMEOUT)

    while True:
        if not queue.full():
            loop.call_soon_threadsafe(queue.put_nowait, DONE)
            break
        else:
            time.sleep(TIMEOUT)

这个对于异步程序特别有用,因为它不会阻止事件循环,即使sync-iterable阻塞了。你可以这样使用它:

def slow_sync_generator():
    yield 0

    time.sleep(1)
    yield 1

    time.sleep(1)
    yield 2

    time.sleep(1)
    yield 3

async def async_task():
    async for x in to_async_iterable(slow_sync_generator()):
        print(x)

asyncio.get_event_loop().run_until_complete(async_task())

您可以使用aiostream.stream.list

from aiostream import stream

async def agen():
    yield 1
    yield 2
    yield 3

async def main():
    lst = await stream.list(agen())
    print(lst)  # prints [1, 2, 3]

documentation中的更多运算符和示例。

您的“异步到同步”助手本身是异步的;根本没有什么大的变化。一般情况下:不,您不能使异步同步。异步值将在“以后某个时候”提供;您不能将其转换为“now”,因为该值不存在于“now”中,您将不得不异步地等待它。

相关问题 更多 >