在不耗尽内存的情况下使用并发期货

2024-04-25 00:39:06 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在做一些文件解析,这是一个CPU限制的任务。不管我在进程中抛出多少文件,它使用的RAM不超过50MB。 该任务是可并行执行的,我已将其设置为使用下面的concurrent futures将每个文件解析为一个单独的进程:

    from concurrent import futures
    with futures.ProcessPoolExecutor(max_workers=6) as executor:
        # A dictionary which will contain a list the future info in the key, and the filename in the value
        jobs = {}

        # Loop through the files, and run the parse function for each file, sending the file-name to it.
        # The results of can come back in any order.
        for this_file in files_list:
            job = executor.submit(parse_function, this_file, **parser_variables)
            jobs[job] = this_file

        # Get the completed jobs whenever they are done
        for job in futures.as_completed(jobs):

            # Send the result of the file the job is based on (jobs[job]) and the job (job.result)
            results_list = job.result()
            this_file = jobs[job]

            # delete the result from the dict as we don't need to store it.
            del jobs[job]

            # post-processing (putting the results into a database)
            post_process(this_file, results_list)

问题是,当我使用futures、ramusage rockets运行它时,不久我就用完了,Python崩溃了。这在很大程度上可能是因为parse_函数的结果大小为几MB。一旦结果通过post_processing,应用程序就不再需要它们了。如您所见,我正在尝试del jobs[job]清除jobs中的项,但这没有任何区别,内存使用量保持不变,并且似乎以相同的速度增加。在

我还确认了这并不是因为它只使用一个进程等待post_process函数,再加上抛出一个time.sleep(1)。在

在futures文档中没有关于内存管理的内容,虽然简单的搜索表明它以前在futures的实际应用中已经出现过(Clear memory in python loophttp://grokbase.com/t/python/python-list/1458ss5etz/real-world-use-of-concurrent-futures),但答案并没有转化为我的用例(它们都与超时等有关)。在

那么,如何在不耗尽内存的情况下使用并发期货呢? (Python 3.5)


Tags: 文件thein进程asjobsjobresult
2条回答

您可以尝试像这样将del添加到代码中

for job in futures.as_completed(jobs):
    del jobs[job]
    del job #or job._result = None

我来试试(可能猜错了…)

您可能需要一点一点地提交您的工作,因为每次提交时,您都会生成parser_变量的副本,这可能最终会占用您的RAM。在

下面是在有趣的部分使用“<;”的工作代码

with futures.ProcessPoolExecutor(max_workers=6) as executor:
    # A dictionary which will contain a list the future info in the key, and the filename in the value
    jobs = {}

    # Loop through the files, and run the parse function for each file, sending the file-name to it.
    # The results of can come back in any order.
    files_left = len(files_list) #<  
    files_iter = iter(files_list) #<   

    while files_left:
        for this_file in files_iter:
            job = executor.submit(parse_function, this_file, **parser_variables)
            jobs[job] = this_file
            if len(jobs) > MAX_JOBS_IN_QUEUE:
                break #limit the job submission for now job

        # Get the completed jobs whenever they are done
        for job in futures.as_completed(jobs):

            files_left -= 1 #one down - many to go...   < -

            # Send the result of the file the job is based on (jobs[job]) and the job (job.result)
            results_list = job.result()
            this_file = jobs[job]

            # delete the result from the dict as we don't need to store it.
            del jobs[job]

            # post-processing (putting the results into a database)
            post_process(this_file, results_list)
            break; #give a chance to add more jobs <  -

相关问题 更多 >