Python:并行异步调用获取网络服务数据的效率问题

2 投票
1 回答
566 浏览
提问于 2025-04-18 14:31

我正在写一个Python脚本,用来获取与特定的group_id对应的一系列主机。我会通过一个网络服务调用来获取这些主机。主机的数量可能会达到10,000个。然后,我会为每个主机从另一个网络服务获取一个叫做property的值。
所以,流程是这样的:group-id ----(ws1)-----10000个主机 --(ws2)----每个主机的property

我正在使用concurrent.futures,下面的代码展示了我的用法。但我觉得这个设计不太干净,可能也不太适合扩展。

def call_ws_1(group_id):
     #fetch list of hosts for group_id


def call_ws_2(host):
     #find property for host


def fetch_hosts(group_ids):
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        future_to_grp_id = {executor.submit(call_ws_1, group_id): group_id for group_id in group_ids}
        for future in concurrent.futures.as_completed(future_to_grp_id):
            group_id = future_to_grp_id[future]
            try:
                hosts = future.result()#this is a list
            except Exception as exp:
                #logging etc
            else:
                 fetch_property(hosts)


def fetch_property(hosts):
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        future_to_host = {executor.submit(call_ws_2, host): host for host in hosts}
        for future in concurrent.futures.as_completed(future_to_host):
            host = future_to_host[future]
            try:
                host_prop = future.result()#String
            except Exception as exp:
                #logging etc
            else:
                 #Save host and property to DB
  1. 使用ProcessPoolExecuter会有什么好处吗?
  2. 如果先获取所有主机(大约40000个),再调用网络服务获取property,这样怎么样?
  3. 还有其他建议可以改善这个设计吗?

1 个回答

2
  1. ProcessPoolExecutor的好处是它不受全局解释器锁(GIL)的影响。使用ThreadPoolExecutor时,GIL会限制同一时间只能有一个线程在运行,除非你在做输入输出(I/O)操作。好消息是,看起来你的两个线程主要是在做I/O,但在每个线程进行网络服务调用之前或之后的任何处理都不会真正并行进行,这会影响你的性能。而ProcessPoolExecutor没有这个限制,但它需要在进程之间传递group_idhost的数据。如果你有成千上万的主机,逐个传递这些数据会带来相当大的开销。

  2. 我认为单靠这个改变不会对性能产生太大影响,因为最终你还是在一个一个地把每个主机发送到线程中处理。

至于第三点,如果你的工作线程几乎只是在做I/O,这种方法可能效果不错。但如果线程中有任何需要CPU处理的工作,那就会严重影响性能。我按照你的程序结构实现了你的两个工作线程,代码如下:

def call_ws_1(group_id):
    return list(range(20))

def call_ws_2(host):
    sum(range(33000000))  # CPU-bound
    #time.sleep(1)  # I/O-bound
    return "{} property".format(host)

然后这样执行所有内容:

if __name__ == "__main__":
    start = time.time()
    fetch_hosts(['a', 'b', 'c', 'd', 'e'])
    end = time.time()
    print("Total time: {}".format(end-start))

使用time.sleep,输出结果是:

Fetching hosts for d
Fetching hosts for a
Fetching hosts for c
Fetching hosts for b
Fetching hosts for e
Total time: 25.051292896270752

使用sum(range(33000000))进行计算时,性能就差得多:

Fetching hosts for d
Fetching hosts for a
Fetching hosts for c
Fetching hosts for b
Fetching hosts for e
Total time: 75.81612730026245

注意,这个计算在我的笔记本上大约需要一秒:

>>> timeit.timeit("sum(range(33000000))", number=1)
1.023313045501709
>>> timeit.timeit("sum(range(33000000))", number=1)
1.029937982559204

所以每个工作线程大约需要一秒。但因为其中一个是CPU密集型的,所以受到了GIL的影响,线程的表现非常糟糕。

这是使用ProcessPoolExecutortime.sleep的情况:

Fetching hosts for a
Fetching hosts for b
Fetching hosts for c
Fetching hosts for d
Fetching hosts for e
Total time: 25.169482469558716

现在使用sum(range(33000000))

Fetching hosts for a
Fetching hosts for b
Fetching hosts for c
Fetching hosts for d
Fetching hosts for e
Total time: 43.54587936401367

可以看到,虽然性能仍然比time.sleep差(可能是因为计算需要的时间超过了一秒,而CPU密集型的工作需要和笔记本上其他运行的任务竞争),但它的表现仍然远超线程版本。

不过,我怀疑随着主机数量的增加,进程间通信(IPC)的开销会让你变得更慢。下面是ThreadPoolExecutor在处理10000个主机时的表现,但工作进程什么都不做(只是返回):

Fetching hosts for c
Fetching hosts for b
Fetching hosts for d
Fetching hosts for a
Fetching hosts for e
Total time: 9.535644769668579

ProcessPoolExecutor相比:

Fetching hosts for c
Fetching hosts for b
Fetching hosts for a
Fetching hosts for d
Fetching hosts for e
Total time: 36.59257411956787

所以,使用ProcessPoolExecutor的速度慢了4倍,都是因为IPC的开销。

那么,这一切意味着什么呢?我认为你能获得的最佳性能是使用ProcessPoolExecutor,同时批量处理IPC,这样你可以将一大块主机数据发送到子进程,而不是一个一个地发送。

像这样(未经测试,但给你一个概念):

import time
import itertools
import concurrent.futures
from concurrent.futures import ProcessPoolExecutor as Pool

def call_ws_1(group_id):
    return list(range(10000))

def call_ws_2(hosts):  # This worker now works on a list of hosts
    host_results = []
    for host in hosts:
        host_results.append(( host, "{} property".format(host)))  # returns a list of (host, property) tuples
    return host_results

def chunk_list(l):
    chunksize = len(l) // 16  # Break the list into smaller pieces
    it = [iter(l)] * chunksize
    for item in itertools.zip_longest(*it):
        yield tuple(filter(None, item))

def fetch_property(hosts):
    with Pool(max_workers=4) as executor:
        futs = []
        for chunk in chunk_list(hosts):
            futs.append(concurrent.futures.submit(call_ws_2, chunk))
        for future in concurrent.futures.as_completed(futs):
            try:
                 results = future.result()
            except Exception as exp:
                print("Got %s" % exp)
            else:
                for result in results:
                    host, property = result
                    # Save host and property to DB

def fetch_hosts(group_ids):
    with Pool(max_workers=4) as executor:
        future_to_grp_id = {executor.submit(call_ws_1, group_id): group_id for group_id in group_ids}
        for future in concurrent.futures.as_completed(future_to_grp_id):
            group_id = future_to_grp_id[future]
            try:
                hosts = future.result()#this is a list
            except Exception as exp:
                print("Got %s" % exp)
            else:
                print("Fetching hosts for {}".format(group_id))
                fetch_property(hosts)

if __name__ == "__main__":
    start = time.time()
    fetch_hosts(['a', 'b', 'c', 'd', 'e'])
    end = time.time()
    print("Total time: {}".format(end-start))

撰写回答