如何在不使用全局参数的情况下收集多线程时的函数返回值?

2024-06-16 09:45:46 发布

您现在位置:Python中文网/ 问答频道 /正文

所以我试图找到一个通用的解决方案,它将从一个函数中收集所有的值,并将它们附加到一个稍后可以访问的列表中。这将在concurrent.futuresthreading类型的任务中使用。下面是我使用全局master_list的解决方案:

from concurrent.futures import ThreadPoolExecutor

master_list = []
def return_from_multithreaded(func):
    # master_list = []
    def wrapper(*args, **kwargs):
        # nonlocal master_list
        global master_list
        master_list += func(*args, **kwargs)
    return wrapper


@return_from_multithreaded
def f(n):
    return [n]


with ThreadPoolExecutor(max_workers=20) as exec:
    exec.map(f, range(1, 100))

print(master_list)

我想找到一个不包含全局变量的解决方案,也许可以返回存储为闭包的注释输出master_list?在


Tags: frommasterreturndefargs解决方案wrapperconcurrent
2条回答

我以前也遇到过这个问题:Running multiple asynchronous function and get the returned value of each function。这就是我的方法:

def async_call(func_list):
    """
    Runs the list of function asynchronously.

    :param func_list: Expects list of lists to be of format
        [[func1, args1, kwargs1], [func2, args2, kwargs2], ...]
    :return: List of output of the functions
        [output1, output2, ...]
    """
    def worker(function, f_args, f_kwargs, queue, index):
        """
        Runs the function and appends the output to list, and the Exception in the case of error
        """
        response = {
            'index': index,  # For tracking the index of each function in actual list.
                             # Since, this function is called asynchronously, order in
                             # queue may differ
            'data': None,
            'error': None
        }

        # Handle error in the function call
        try:
            response['data'] = function(*f_args, **f_kwargs)
        except Exception as e:
            response['error'] = e  # send back the exception along with the queue

        queue.put(response)
    queue = Queue()
    processes = [Process(target=worker, args=(func, args, kwargs, queue, i)) \
                    for i, (func, args, kwargs) in enumerate(func_list)]

    for process in processes:
        process.start()

    response_list = []
    for process in processes:
        # Wait for process to finish
        process.join()

        # Get back the response from the queue
        response = queue.get()
        if response['error']:
            raise response['error']   # Raise exception if the function call failed
        response_list.append(response)

    return [content['data'] for content in sorted(response_list, key=lambda x: x['index'])]

样本运行:

^{pr2}$

如果不想使用全局变量,请不要放弃map的结果。map返回每个函数返回的值,您只是忽略了它们。通过使用map实现其预期用途,可以使此代码更简单:

def f(n):
    return n  # No need to wrap in list

with ThreadPoolExecutor(max_workers=20) as exec:
    master_list = list(exec.map(f, range(1, 100)))

print(master_list)

如果您需要一个master_list来显示到目前为止计算的结果(可能有其他线程在监视它),那么只需将循环显式化:

^{pr2}$

这就是Executor模型的设计目的;普通线程不打算返回值,但是Executors提供了一个在幕后返回值的通道,因此您不必自己管理它。在内部,这是使用某种形式的队列,使用额外的元数据来保持结果的有序性,但是您不需要处理这种复杂性;从您的角度来看,它相当于常规的map函数,它只是将工作并行化。在


更新以涵盖异常处理:

当结果命中时,map将引发worker中引发的任何异常。因此,如前所述,如果任何任务失败,第一组代码将不存储任何内容(将部分构造list,但当异常引发时将被丢弃)。第二个示例只在抛出第一个异常之前保留结果,其余的则被丢弃(您必须存储map迭代器并使用一些笨拙的代码来避免它)。如果您需要存储所有成功的结果,忽略失败(或只是以某种方式记录它们),最简单的方法是使用submit创建Future对象的list,然后按顺序或按完成顺序等待它们,将.result()调用包装在try/except中,以避免丢掉好的结果。例如,要按提交顺序存储结果,可以执行以下操作:

^{3}$

为了获得更高效的代码,您可以按照完成的顺序检索结果,而不是按提交的顺序,使用^{}在结果完成时急切地检索结果。与以前的代码相比,唯一的变化是:

    for fut in futures:

变成:

    for fut in concurrent.futures.as_completed(futures):

as_completed中,as_completed一旦完成/取消期货,就立即进行{}的工作,而不是推迟到所有先前提交的期货完成并得到处理。在

使用add_done_callback还有更复杂的选项,因此主线程根本不参与显式处理结果,但这通常是不必要的,而且常常令人困惑,因此最好尽可能避免。在

相关问题 更多 >