从异步函数监视multiprocessing.pool.pool对象

2024-03-28 12:10:55 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个由discord机器人管理的multiprocessing.pool.Pool。因为discord.py是异步的,所以我使用pool.starmap_async()pool.apply_async()AsyncResult.get()来管理任务。bot在启动时启动一个池(这可能看起来很奇怪,但由于init时间长,这是最有效的方法)

是否有一种方法可以检查在任何给定时刻池中当前排队/执行的进程数量?检查函数可以访问Pool本身,但不能访问任何AsyncResult

我也愿意采用其他方法来获得相同的结果,即池中的活动/排队进程


Tags: 方法pygetasyncinitbot时间机器人
1条回答
网友
1楼 · 发布于 2024-03-28 12:10:55

Since discord.py is async, i use pool.starmap_async(), pool.apply_async() and AsyncResult.get() to manage tasks.

除非采取特殊预防措施,否则这看起来不正确,因为AsyncResult.get()将阻止事件循环。以apply_async等方法的名称命名的异步不是由asyncio部署的那种异步。多处理使用同步代码与子进程通信,在后台线程中这样做可以让代码继续处理其他事情

将异步IO和多处理结合起来的一种更安全的方法是^{}模块,它提供了内部使用多处理的^{}类,其执行器通过^{}支持异步IO

is there a way i can check on how many processes are currently queued/executing in the pool at any given moment?

我认为没有公共API来查询这些信息,但是如果你拥有这个池,你可以很容易地维护必要的统计数据。例如(未经测试):

class Pool:
    def __init__(self, nworkers):
        self._executor = concurrent.futures.ProcessPoolExecutor(nworkers)
        self._nworkers = nworkers
        self._submitted = 0

    async def submit(self, fn, *args):
        self._submitted += 1
        loop = asyncio.get_event_loop()
        fut = loop.run_in_executor(self._executor, fn, *args)
        try:
            return await fut
        finally:
            self._submitted -= 1

    def stats(self):
        queued = max(0, self._submitted - self._nworkers)
        executing = min(self._submitted, self._nworkers)
        return queued, executing

您可以通过调用submit()来使用它,您可以等待它立即得到结果,也可以传递给create_task()来获得一个您可以稍后等待的未来,或者gather()与其他未来一起等待,等等

相关问题 更多 >