当另一个线程从API获取数据时,如何同步一个线程来处理数据?

2024-03-29 06:15:37 发布

您现在位置:Python中文网/ 问答频道 /正文

我想批量从API获取数据,然后处理这些数据。当CPU在当前批处理的下载过程中处于空闲状态时,如果另一个线程可以处理来自前一个线程的数据,那就更好了

下面使用n线程下载批处理并对其进行处理。我观察到的是所有n线程将同时下载它们的批处理,然后它们将同时处理数据。不完全是我想要的。有什么建议吗

import concurrent.futures
import threading


thread_local = threading.local()

def fetch_async(batch_size, threads=3):
    def fetch(offset):
        print(f'Starting download offset={offset}, id={threading.get_ident()}')
        res = download(offset, batch_size)
        return process(res)


    print(f'Fetching async: batch_size={batch_size}')
    offsets = list(range(0, total + batch_size, batch_size))

    with concurrent.futures.ThreadPoolExecutor(max_workers=threads) as executor:
        res = executor.map(fetch, offsets)

    # materialize + flatten iterator
    res = list(res)
    return [y for x in res for y in x]

我在日志中看到的是语句Starting download为每个线程连续快速打印,然后在下载和处理过程中暂停。然后,一些语句基本上同时出现,等等。下载部分需要3-5秒才能完成。我尝试了3、4、8和12个线程。相同的行为,完成整个过程所需的时间大致相同(进行约100个单独的API调用)