python中异步函数中要调用的所有参数是什么?

2024-04-23 15:48:42 发布

您现在位置:Python中文网/ 问答频道 /正文

  1. 我有一个函数var。我想知道通过利用系统拥有的所有处理器、内核和RAM内存进行多处理/并行处理来快速运行该函数中循环的最佳方法

    import numpy as np
    from pysheds.grid import Grid
    
    xs = 82.1206, 80.8707, 80.8789, 80.8871, 80.88715
    ys = 25.2111, 16.01259, 16.01259, 16.01259, 15.9956
    
    a = r'/home/test/image1.tif'
    b = r'/home/test/image2.tif'
    
    def var(interest):
    
        variable_avg = []
        for (x,y) in zip(xs,ys):
            grid = Grid.from_raster(r'/home/data/data.tif', data_name='map')
    
            grid.catchment(data='map', x=x, y=y, out_name='catch', recursionlimit=15000000, xytype='label') 
    
            grid.clip_to('catch')
    
            grid.read_raster(interest, data_name='variable', window=grid.bbox, window_crs=grid.crs)
    
            variablemask = grid.view('variable', nodata=np.nan)
            variablemask = np.array(variablemask)
            variablemean = np.nanmean(variablemask)
            variable_avg.append(variablemean)
        return(variable_avg)
    
    
  2. 如果我能为函数的给定多个参数同时运行函数var和循环,那就太好了。 例如:同时调用var(a)var(b)。因为它只需为多个坐标(xs,ys)并行循环就可以节省很多时间

pysheds可以在here找到文档。

grid = Grid.from_raster(r'/home/data/data.tif', data_name='map')的代码中使用的data.tif数据可以直接从here下载。相同的数据可以用不同的名称复制到目录中,并在a = r'/home/test/image1.tif'处使用 和b = r'/home/test/image2.tif'用于测试代码。

为了加速上述代码,我得到了一个建议here,如下所示:

def process_poi(interest, x, y):
    grid = Grid.from_raster(interest, data_name='map')

    grid.catchment(data='map', x=x, y=y, out_name='catch')

    variable = grid.view('catch', nodata=np.nan)
    variable = np.array(variable)
    return variable.mean()

async def var_loop_async(interest, pool, loop):
    tasks = []
    for (x,y) in zip(xs,ys):
        function_call = functools.partial(process_poi, interest, x, y)
        tasks.append(loop.run_in_executor(pool, function_call))

    return await asyncio.gather(*tasks)

async def main():
    loop = asyncio.get_event_loop()
    pool_start = time.time()
    tasks = []
    with ProcessPoolExecutor() as pool:
        for _ in range(100):
            tasks.append(var_loop_async(a, pool, loop))
        results = await asyncio.gather(*tasks)
        pool_end = time.time()
        print(f'Process pool took {pool_end-pool_start}')

    serial_start = time.time() 

但是,我不明白如何调用函数var_loop_async(interest, pool, loop)。事实上,我无法获得要调用哪些参数来代替poolloop

我对python编程非常陌生

如果可能,请将上述建议作为一个可复制的解决方案,以便可以直接在python中运行。或者,如果您有任何其他更好的建议,以加快原始代码,请一定要让我知道


Tags: nameloopmaphomedataasynctimevar
1条回答
网友
1楼 · 发布于 2024-04-23 15:48:42

首先,在您的原始代码中,我看到:

for (x,y) in zip(xs,ys):
    grid = Grid.from_raster(interest, data_name='map')

我不熟悉pysheds模块,也找不到任何关于它的文档,因此我不知道Grid.from_raster是否是一个昂贵的操作。但这条语句似乎是在for循环之上移动而不是在循环中重新计算的候选语句。也许仅此一项就可以显著提高性能。您提到的链接What all parameters to be called in a async function in python?表明,创建进程池的开销可能不足以弥补这些麻烦。此外,如果Grid.from_raster很昂贵,并且通过将其从循环中移除而获利,那么多处理解决方案本质上通过使其对每个x,y对执行而“将其放回循环”,从而使多处理解决方案不太可能导致性能改进

无论如何,要使用建议的技术运行代码,请参见下面的。不幸的是,您不能在处理器池中同时运行process_poivar_loop_async。但请在下面进一步寻找不同的解决方案

import numpy
from pysheds.grid import Grid
from concurrent.futures.process import ProcessPoolExecutor
import asyncio


xs = (82.1206, 72.4542, 65.0431, 83.8056, 35.6744)
ys = (25.2111, 17.9458, 13.8844, 10.0833, 24.8306)

file_list = (
    r'/home/test/image1.tif',
    r'/home/test/image2.tif'
)


def process_point(interest, x, y):
    grid = Grid.from_raster(r'/home/data/data.tif', data_name='map')
    grid.catchment(data='map', x=x, y=y, out_name='catch', recursionlimit=15000000, xytype='label')
    grid.clip_to('catch')
    grid.read_raster(interest, data_name='variable', window=grid.bbox, window_crs=grid.crs)
    variablemask = grid.view('variable', nodata=np.nan)
    variablemask = numpy.array(variablemask)
    variablemean = np.nanmean(variablemask)
    return variablemean


async def var_loop_async(interest, pool, loop):
    tasks = [loop.run_in_executor(pool, process_point, interest, x, y) for (x, y) in zip(xs, ys)]
    return await asyncio.gather(*tasks)


async def main():
    loop = asyncio.get_event_loop()
    with ProcessPoolExecutor() as pool:
        tasks = [var_loop_async(file, pool, loop) for file in file_list]
        results = await asyncio.gather(*tasks)
        print(results)


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(main())
    finally:
        loop.close()

不同的解决方案

您希望能够在进程池中为每个要处理的文件运行var,然后在子进程中处理每个x,y对。这意味着您需要处理文件的每个子进程都有自己的进程池来处理x,y对。这通常是不可能的,因为为进程池创建的进程是守护进程进程(它们在主进程终止时自动终止),并且不允许它们创建自己的子进程。为了克服这个问题,我们必须创建自己的mutliprocessor.Pool专门化,并用自己的池初始化每个子进程

但这会是一种性能改进吗除了等待process_poi子进程完成其工作外,var子进程基本上什么也不做。因此,我不认为这比以前的代码有多大的改进。而且,正如我所提到的,目前还不清楚这两种多处理解决方案是否会比原始代码有所改进,尤其是修改为重新定位Grid.from_raster调用的方案

import numpy
from pysheds.grid import Grid
import functools
from multiprocessing.pool import Pool
import multiprocessing
import os

# This allows subprocesses to have their own pools:

class NoDaemonProcess(multiprocessing.Process):
    # make 'daemon' attribute always return False
    def _get_daemon(self):
        return False
    def _set_daemon(self, value):
        pass
    daemon = property(_get_daemon, _set_daemon)

class NoDaemonContext(type(multiprocessing.get_context())):
    Process = NoDaemonProcess

class MyPool(multiprocessing.pool.Pool):
    def __init__(self, *args, **kwargs):
        kwargs['context'] = NoDaemonContext()
        super(MyPool, self).__init__(*args, **kwargs)


xs = 82.1206, 72.4542, 65.0431, 83.8056, 35.6744
ys = 25.2111, 17.9458, 13.8844, 10.0833, 24.8306

a = r'/home/test/image1.tif'
b = r'/home/test/image2.tif'


pool2 = None

def init_pool():
    global pool2
    #pool2 = Pool(5)
    pool2 = Pool(os.cpu_count // 2) # half the available number of processors


def process_poi(interest, x, y):
    grid = Grid.from_raster(r'/home/data/data.tif', data_name='map')
    grid.catchment(data='map', x=x, y=y, out_name='catch', recursionlimit=15000000, xytype='label')
    grid.clip_to('catch')
    grid.read_raster(interest, data_name='variable', window=grid.bbox, window_crs=grid.crs)
    variablemask = grid.view('variable', nodata=np.nan)
    variablemask = numpy.array(variablemask)
    variablemean = np.nanmean(variablemask)
    return variablemean


def var(interest):
    task = functools.partial(process_poi, interest)
    return pool2.starmap(task, zip(xs, ys))


def main():
    # This will create non-daemon processes so that these processes can create their own pools:
    with MyPool(2, init_pool) as pool:
        results = pool.map(var, [a, b])
        print(results)


if __name__ == "__main__":
    main()

使用线程的第三种解决方案

使用asyncio

import numpy
from pysheds.grid import Grid
from concurrent.futures import ThreadPoolExecutor
import asyncio

xs = (82.1206, 72.4542, 65.0431, 83.8056, 35.6744)
ys = (25.2111, 17.9458, 13.8844, 10.0833, 24.8306)

file_list = [
    r'/home/test/image1.tif',
    r'/home/test/image2.tif'
]

def process_point(interest, x, y):
    grid = Grid.from_raster(r'/home/data/data.tif', data_name='map')
    grid.catchment(data='map', x=x, y=y, out_name='catch', recursionlimit=15000000, xytype='label')
    grid.clip_to('catch')
    grid.read_raster(interest, data_name='variable', window=grid.bbox, window_crs=grid.crs)
    variablemask = grid.view('variable', nodata=np.nan)
    variablemask = numpy.array(variablemask)
    variablemean = np.nanmean(variablemask)
    return variablemean


async def var_loop_async(interest, pool, loop):
    tasks = [loop.run_in_executor(pool, process_point, interest, x, y) for (x, y) in zip(xs, ys)]
    return await asyncio.gather(*tasks)


async def main():
    loop = asyncio.get_event_loop()
    with ThreadPoolExecutor(max_workers=100) as pool:
        tasks = [var_loop_async(file, pool, loop) for file in file_list]
        results = await asyncio.gather(*tasks)
        print(results)


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(main())
    finally:
        loop.close()

备选方案:

import numpy
from pysheds.grid import Grid
import functools
from concurrent.futures import ThreadPoolExecutor


xs = 82.1206, 72.4542, 65.0431, 83.8056, 35.6744
ys = 25.2111, 17.9458, 13.8844, 10.0833, 24.8306

a = r'/home/test/image1.tif'
b = r'/home/test/image2.tif'



def process_poi(interest, x, y):
    grid = Grid.from_raster(r'/home/data/data.tif', data_name='map')
    grid.catchment(data='map', x=x, y=y, out_name='catch', recursionlimit=15000000, xytype='label')
    grid.clip_to('catch')
    grid.read_raster(interest, data_name='variable', window=grid.bbox, window_crs=grid.crs)
    variablemask = grid.view('variable', nodata=np.nan)
    variablemask = numpy.array(variablemask)
    variablemean = np.nanmean(variablemask)
    return variablemean


def var(executor, interest):
    return list(executor.map(functools.partial(process_poi, interest), xs, ys))


def main():
    with ThreadPoolExecutor(max_workers=100) as executor:
        results = list(executor.map(functools.partial(var, executor), [a, b]))
        print(results)


if __name__ == "__main__":
    main()

使用基于OP更新代码的线程更新解决方案

import numpy
from pysheds.grid import Grid
import functools
from concurrent.futures import ThreadPoolExecutor


xs = (82.1206, 72.4542, 65.0431, 83.8056, 35.6744)
ys = (25.2111, 17.9458, 13.8844, 10.0833, 24.8306)

file_list = (
    r'/home/test/image1.tif',
    r'/home/test/image2.tif'
)


def process_point(interest, x, y):
    grid = Grid.from_raster(r'/home/data/data.tif', data_name='map')
    grid.catchment(data='map', x=x, y=y, out_name='catch', recursionlimit=15000000, xytype='label')
    grid.clip_to('catch')
    grid.read_raster(interest, data_name='variable', window=grid.bbox, window_crs=grid.crs)
    variablemask = grid.view('variable', nodata=np.nan)
    variablemask = numpy.array(variablemask)
    variablemean = np.nanmean(variablemask)
    return variablemean

def var(executor, interest):
    return list(executor.map(functools.partial(process_point, interest), xs, ys))


def main():
    with ThreadPoolExecutor(max_workers=100) as executor:
        results = list(executor.map(functools.partial(var, executor), file_list))
        print(results)


if __name__ == "__main__":
    main()

相关问题 更多 >