python中的Asyncio:如何在运行协同程序时等待一些文件完成

2024-03-29 10:43:24 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在创建一个程序,从多个远程站(~200)接收rsync的数据。有时每个工作站和工作站组都有多个文件,不应同时联系这些文件,否则连接将关闭。目前,我有一个异步例程(在python中使用asyncio),它可以一次对所有文件进行异步rsync(到它们各自的工作站)。如果我们得到一个积压的文件或者同时联系同一个组中的站点,就会导致连接关闭。你知道吗

我需要做的是创建分组任务,在工作站组中,它等待上一个文件更新(updateFile()),然后再启动下一个文件,但同时异步运行所有工作站组。你知道吗

对异步编程还不熟悉,我就是不知道如何解决这个问题。你知道吗

我现在已经成功地异步运行了所有的东西。你知道吗

启动回路

loop = asyncio.get_event_loop()

创建分组桩号任务和单个桩号任务

tasks=[]
for group, files in file_groups.items():
   files_in_task = []
   for order, fqdn, file in files:
       if group == 'none':
           futures = [updateFile(file, fqdn)]
           tasks.append(asyncio.gather(*futures))
       else: # In Group
           x = (file, fqdn)
           files_in_task.append(x)
       futures = [updateFile(file,fqdn) for (file,fqdn) in files_in_task]
       tasks.append(asyncio.wait(*futures))

运行事件循环,直到所有任务都返回

loop.run_until_complete(asyncio.wait(tasks))
loop.close()

Tags: 文件inloopasynciofortaskfilesfile
1条回答
网友
1楼 · 发布于 2024-03-29 10:43:24

如果我的理解是正确的,那么您需要实现并行运行的组,每个组按顺序运行其元素。你知道吗

更新单个组的协同程序将由一个简单的循环组成,该循环使用await依次计算各个元素:

async def update_group(files):
    # update a single group, by running updateFiles in series
    for _order, fqdn, filename in files:
        await updateFile(filename, fqdn)

更新所有组需要并行运行update_group()的多个实例。在asyncio中,并行的单位是task,因此我们为每个update_group()创建一个,并使用^{}等待所有任务完成,允许它们并行运行:

async def update_all_groups(file_groups):
    # update all the different groups in parallel
    tasks = []
    for _group, files in file_groups.items():
        tasks.append(asyncio.create_task(update_group(files)))
    await asyncio.gather(*tasks)

最后,从同步代码调用整个设置,理想情况下只需调用^{},即可设置、运行并关闭循环:

asyncio.run(update_all_groups(file_groups))

相关问题 更多 >