如何使用字典修复多线程/多处理?

2024-04-25 04:33:04 发布

您现在位置:Python中文网/ 问答频道 /正文

我对一个api调用了超过10万次,使用了两个函数,第一个函数是访问api并获取每个主机的sysinfo(dict),第二个函数是访问sysinfo并获取IP地址。我正在寻找一种加速的方法,但以前从未使用过多处理/线程(目前大约需要3个小时)。你知道吗

from multiprocessing import Pool
from multiprocessing.dummy import Pool as ThreadPool

#pool = ThreadPool(4)
p = Pool(5)

#obviously I removed a lot of the code that generates some of these
#variables, but this is the part that slooooows everything down. 

def get_sys_info(self, host_id, appliance):
    sysinfo = self.hx_request("https://{}:3000//hx/api/v3/hosts/{}/sysinfo"
    return sysinfo

def get_ips_from_sysinfo(self, sysinfo):
    sysinfo = sysinfo["data"]
    network_array = sysinfo.get("networkArray", {})
    network_info = network_array.get("networkInfo", [])
    ips = []
    for ni in network_info:
        ip_array = ni.get("ipArray", {})
        ip_info = ip_array.get("ipInfo", [])
        for i in ip_info:
            ips.append(i)
    return ips

if __name__ == "__main__":
    for i in ids:
        sysinfo = rr.get_sys_info(i, appliance)
        hostname = sysinfo.get("data", {}).get("hostname")
        try:
            ips = p.map(rr.get_ips_from_sysinfo(sysinfo))
        except Exception as e:
            rr.logger.error("Exception on {} -- {}".format(hostname, e))
            continue

#Tried calling it here
ips = p.map(rr.get_ips_from_sysinfo(sysinfo))

我必须经历超过100000次这样的api调用,而这确实是减慢一切的部分。你知道吗

我想我已经试过了所有的方法,并且得到了每一个可能的错误。你知道吗

我真的很感激任何形式的帮助。谢谢您!你知道吗


Tags: 函数infromselfipinfoapifor
3条回答

您可以使用线程和队列进行通信,首先将get_ips_from_sysinfo作为单个线程启动,以监视和处理将输出存储在output_list中的任何已完成的sysinfo,然后启动所有get_sys_info线程,注意不要用100k线程耗尽内存

from threading import Thread
from queue import Queue

jobs = Queue()  # buffer for sysinfo
output_list = []  # store ips

def get_sys_info(self, host_id, appliance):
    sysinfo = self.hx_request("https://{}:3000//hx/api/v3/hosts/{}/sysinfo"
    jobs.put(sysinfo)  # add sysinfo to jobs queue
    return sysinfo  # comment if you don't need it

def get_ips_from_sysinfo(self):
    """it will run contineously untill finish all jobd"""
    while True:
        # get sysinfo from jobs queue
        sysinfo = jobs.get()  # it will wait here for new entry
        if sysinfo == 'exit':
            print('we are done here')
            break

        sysinfo = sysinfo["data"]
        network_array = sysinfo.get("networkArray", {})
        network_info = network_array.get("networkInfo", [])
        ips = []
        for ni in network_info:
            ip_array = ni.get("ipArray", {})
            ip_info = ip_array.get("ipInfo", [])
            for i in ip_info:
                ips.append(i)
        output_list.append(ips)


if __name__ == "__main__":
    # start our listner thread
    Thread(target=rr.get_ips_from_sysinfo)

    threads = []
    for i in ids:
        t = Thread(target=rr.get_sys_info, args=(i, appliance))
        threads.append(t)
        t.start()

    # wait for threads to finish then terminate get_ips_from_sysinfo() by send 'exit' flag
    for t in threads:
        t.join()

    jobs.put('exit')

正如@wwii所评论的,concurrent.futures提供了一些便利,您可以帮助您,特别是因为这看起来像一个批处理作业。你知道吗

您的性能影响似乎最有可能来自网络调用,因此多线程可能更适合您的用例(here是多处理的比较)。如果没有,您可以在使用相同的api时将池从线程切换到进程。你知道吗

from concurrent.futures import ThreadPoolExecutor, as_completed
# You can import ProcessPoolExecutor instead and use the same APIs

def thread_worker(instance, host_id, appliance):
    """Wrapper for your class's `get_sys_info` method"""
    sysinfo = instance.get_sys_info(host_id, appliance)
    return sysinfo, instance

# instantiate the class that contains the methods in your example code
# I will call it `RR`
instances = (RR(*your_args, **your_kwds) for your_args, your_kwds 
    in zip(iterable_of_args, iterable_of_kwds))
all_host_ids = another_iterable
all_appliances = still_another_iterable

if __name__ == "__main__":
   with ThreadPoolExecutor(max_workers=50) as executor:  # assuming 10 threads per core; your example uses 5 processes
        pool = {executor.submit(thread_worker, instance, _id, _app): (_id, _app)
            for _id, _app in zip(instances, all_host_ids, all_appliances)}

        # handle the `sysinfo` dicts as they arrive
        for future in as_completed(pool):
            _result = future.result()
            if isinstance(_sysinfo, Exception):  # just one way of handling exceptions
                # do something
                print(f"{pool[future]} raised {future.result()}")
            else:
                # enqueue results for parallel processing in a separate stage, or
                # process the results serially
                _sysinfo, _instance = _result
                ips = _instance.get_ips_from_sysinfo(_sysinfo)
                # do something with `ips`

如果方法确实没有像代码中那样使用状态,那么可以通过将方法重构为函数来简化这个示例。你知道吗

如果提取sysinfo数据的成本很高,您可以将结果排队,然后将结果反馈给在排队的dict上调用get_ips_from_sysinfoProcessPoolExecutor。你知道吗

不管出于什么原因,我对在多个线程中调用实例方法略知一二,但它似乎是可行的。我用concurrent.futures做了这个玩具的例子-希望它能很好地模拟你的实际情况。这会将4000个实例方法调用提交到一个线程池(最多)500个工作线程。在玩max_workers值的时候,我发现执行时间的改进是非常线性的,大约有1000个工人,然后改进率开始下降。你知道吗

import concurrent.futures, time, random

a = [.001*n for n in range(1,4001)]

class F:
    def __init__(self, name):
        self.name = f'{name}:{self.__class__.__name__}'
    def apicall(self,n):
        wait = random.choice(a)
        time.sleep(wait)
        return (n,wait, self.name)

f = F('foo')

if __name__ == '__main__':
    nworkers = 500
    with concurrent.futures.ThreadPoolExecutor(nworkers) as executor:
#        t = time.time()
        futures = [executor.submit(f.apicall, n) for n in range(4000)]
        results = [future.result() for future in concurrent.futures.as_completed(futures)]
#        t = time.time() - t
#    q = sum(r[1] for r in results)
#    print(f'# workers:{nworkers} - ratio:{q/t}')

我没有说明方法调用期间可能抛出的异常,但是文档中的示例非常清楚如何处理。你知道吗

相关问题 更多 >