我有一个程序,加载数据和处理它。加载和处理都需要时间,我想同时进行。你知道吗
这是我的程序的同步版本(其中“加载”和“处理”是按顺序完成的,为了示例,这里是一些琐碎的操作):
import time
def data_loader():
for i in range(4):
time.sleep(1) # Simulated loading time
yield i
def main():
start = time.time()
for data in data_loader():
time.sleep(1) # Simulated processing time
processed_data = -data*2
print(f'At t={time.time()-start:.3g}, processed data {data} into {processed_data}')
if __name__ == '__main__':
main()
当我运行这个时,我得到输出:
At t=2.01, processed data 0 into 0
At t=4.01, processed data 1 into -2
At t=6.02, processed data 2 into -4
At t=8.02, processed data 3 into -6
循环每2s运行一次,1s加载,1s处理。你知道吗
现在,我想制作一个异步的版本,在这个版本中,加载和处理是同时进行的(这样,在处理器处理下一个数据时,加载程序就准备好了)。然后,打印第一条语句需要2秒,之后的每条语句需要1秒。预期产出类似于:
At t=2.01, processed data 0 into 0
At t=3.01, processed data 1 into -2
At t=4.02, processed data 2 into -4
At t=5.02, processed data 3 into -6
理想情况下,只有main
函数的内容需要更改(因为data_loader
代码不应该关心它是否可以异步使用)。你知道吗
下面是一个解决方案,它允许您使用
iter_asynchronously
函数包装数据加载器。它暂时解决了这个问题。(但是请注意,仍然存在一个问题,即如果dataloader比处理循环快,队列将无限期地增长。如果队列变大,那么可以通过在_async_queue_manager
中添加等待来轻松解决这个问题(但遗憾的是,Mac不支持Queue.qsize()
)你知道吗现在输出符合要求:
^{} 模块的实用程序可能就是您想要的。你知道吗
这个输出
根据您的需求,您可能会发现
.imap_unordered()
更快,而且值得一提的是,有一个基于线程的Pool
版本可以作为multiprocessing.dummy.Pool
使用—如果您的数据很大,并且您的处理不是用Python完成的,那么这可能有助于避免IPC开销(因此您可以避免GIL)。你知道吗问题的关键在于数据的实际处理。我不知道您在实际程序中对数据做什么,但要使用异步编程,它必须是一个异步操作。如果您正在执行活动的、阻塞的CPU绑定处理,那么最好将负载转移到一个单独的进程,这样就可以同时使用多个CPU核并执行任务。如果数据的实际处理实际上只是某个异步服务的消耗,那么它可以非常有效地包装在单个异步并发线程中。你知道吗
在您的示例中,您使用
time.sleep()
来模拟处理。由于该示例操作可以异步完成(改为使用asyncio.sleep()
),因此转换很简单:结果,如你所料:
请记住,它之所以有效,是因为
time.sleep()
有一个asyncio.sleep()
形式的异步替代方案。检查您正在使用的操作,看看它是否可以以异步形式编写。你知道吗相关问题 更多 >
编程相关推荐