如何把芹菜和异步菜结合起来?

2024-06-16 09:44:00 发布

您现在位置:Python中文网/ 问答频道 /正文

如何创建一个包装器,使芹菜任务看起来像asyncio.Task?还是有更好的方法把芹菜和asyncio结合起来?

@asksol,芹菜的创造者,said this:

It's quite common to use Celery as a distributed layer on top of async I/O frameworks (top tip: routing CPU-bound tasks to a prefork worker means they will not block your event loop).

但是我找不到任何专门针对asyncio框架的代码示例。


Tags: to方法asynciotaskusetopitcommon
3条回答

您可以使用run_in_executor将任何阻塞调用包装到任务中,如documentation中所述,我还在示例中添加了一个自定义timeout

def run_async_task(
    target,
    *args,
    timeout = 60,
    **keywords
) -> Future:
    loop = asyncio.get_event_loop()
    return asyncio.wait_for(
        loop.run_in_executor(
            executor,
            functools.partial(target, *args, **keywords)
        ),
        timeout=timeout,
        loop=loop
    )
loop = asyncio.get_event_loop()
async_result = loop.run_until_complete(
    run_async_task, your_task.delay, some_arg, some_karg="" 
)
result = loop.run_until_complete(
    run_async_task, async_result.result 
)

如官方网站所述,芹菜5.0版将有可能做到这一点:

http://docs.celeryproject.org/en/4.0/whatsnew-4.0.html#preface

  1. The next major version of Celery will support Python 3.5 only, were we are planning to take advantage of the new asyncio library.
  2. Dropping support for Python 2 will enable us to remove massive amounts of compatibility code, and going with Python 3.5 allows us to take advantage of typing, async/await, asyncio, and similar concepts there’s no alternative for in older versions.

以上内容引用自上一链接。

所以最好的办法就是等待5.0版的发布!

同时,快乐的编码:)

我发现最干净的方法是将async函数包装在asgiref.sync.async_to_sync(来自^{})中:

from asgiref.sync import async_to_sync
from celery.task import periodic_task


async def return_hello():
    await sleep(1)
    return 'hello'


@periodic_task(
    run_every=2,
    name='return_hello',
)
def task_return_hello():
    async_to_sync(return_hello)()

我从我写的blog post中提取了这个例子。

相关问题 更多 >