我想批量从API获取数据,然后处理这些数据。当CPU在当前批处理的下载过程中处于空闲状态时,如果另一个线程可以处理来自前一个线程的数据,那就更好了
下面使用n
线程下载批处理并对其进行处理。我观察到的是所有n
线程将同时下载它们的批处理,然后它们将同时处理数据。不完全是我想要的。有什么建议吗
import concurrent.futures
import threading
thread_local = threading.local()
def fetch_async(batch_size, threads=3):
def fetch(offset):
print(f'Starting download offset={offset}, id={threading.get_ident()}')
res = download(offset, batch_size)
return process(res)
print(f'Fetching async: batch_size={batch_size}')
offsets = list(range(0, total + batch_size, batch_size))
with concurrent.futures.ThreadPoolExecutor(max_workers=threads) as executor:
res = executor.map(fetch, offsets)
# materialize + flatten iterator
res = list(res)
return [y for x in res for y in x]
我在日志中看到的是语句Starting download
为每个线程连续快速打印,然后在下载和处理过程中暂停。然后,一些语句基本上同时出现,等等。下载部分需要3-5秒才能完成。我尝试了3、4、8和12个线程。相同的行为,完成整个过程所需的时间大致相同(进行约100个单独的API调用)
目前没有回答
相关问题 更多 >
编程相关推荐