用b保存并获取数据

2024-04-18 18:38:04 发布

您现在位置:Python中文网/ 问答频道 /正文

我遇到了一种情况-我在python3.x中使用asyncio包,并将数据持久化到with块中,类似这样:

test_repo = TestRepository()

with (yield from test_repo):
    res = yield from test_repo.get_by_lim_off(
            page_size=int(length),
            offset=start,
            customer_name=customer_name,
            customer_phone=customer_phone,
            return_type=return_type
        )

我需要在with块中获取res数据,但是当我退出with块时,应该发生持久化和获取数据的操作。我怎样才能做到这一点?你知道吗


Tags: 数据namefromtestasyncioreturntypewith
1条回答
网友
1楼 · 发布于 2024-04-18 18:38:04

只有在Python 3.5+中,通过异步上下文管理器(__aenter__/__aexit__)和async with才支持此行为,这两种管理器都添加在PEP 492

class TestRepository:
   # All your normal methods go here

   async def __aenter__(self):
      # You can call coroutines here
      await self.some_init()

   async def __aexit__(self, exc_type, exc, tb):
      # You can call coroutines here
      await self.do_persistence()
      await self.fetch_data()


async def do_work():
    test_repo = TestRepository()

    async with test_repo:
        res = await test_repo.get_by_lim_off(
                page_size=int(length),
                offset=start,
                customer_name=customer_name,
                customer_phone=customer_phone,
                return_type=return_type
            )

 asyncio.get_event_loop().run_until_complete(do_work())

在3.5之前,必须使用try/finally块来显式调用init/cleanup协程,不幸的是:

@asyncio.coroutine
def do_work():
    test_repo = TestRepository()

    yield from test_repo.some_init()
    try:
        res = yield from test_repo.get_by_lim_off(
                page_size=int(length),
                offset=start,
                customer_name=customer_name,
                customer_phone=customer_phone,
                return_type=return_type
            )
    finally:
        yield from test_repo.do_persistence()
        yield from test_repo.fetch_data()

相关问题 更多 >