如何使用所有CPU子处理文件的大列表?

2024-04-20 08:29:07 发布

您现在位置:Python中文网/ 问答频道 /正文

我需要使用命令行中的LaTeXML库将86000个TEX文件转换为XML。我试图编写一个Python脚本,利用所有4个内核,通过subprocess模块实现自动化。你知道吗

def get_outpath(tex_path):
    path_parts = pathlib.Path(tex_path).parts
    arxiv_id = path_parts[2]
    outpath = 'xml/' + arxiv_id + '.xml'
    return outpath

def convert_to_xml(inpath):
    outpath = get_outpath(inpath)

    if os.path.isfile(outpath):
        message = '{}: Already converted.'.format(inpath)
        print(message)
        return

    try:
        process = subprocess.Popen(['latexml', '--dest=' + outpath, inpath], 
                                   stderr=subprocess.PIPE, 
                                   stdout=subprocess.PIPE)
    except Exception as error:
        process.kill()
        message = "error: %s run(*%r, **%r)" % (e, args, kwargs)
        print(message)

    message = '{}: Converted!'.format(inpath)
    print(message)

def start():
    start_time = time.time()
    pool = multiprocessing.Pool(processes=multiprocessing.cpu_count(),
                               maxtasksperchild=1)
    print('Initialized {} threads'.format(multiprocessing.cpu_count()))
    print('Beginning conversion...')
    for _ in pool.imap_unordered(convert_to_xml, preprints, chunksize=5): 
        pass
    pool.close()
    pool.join()
    print("TIME: {}".format(total_time))

start()

脚本导致Too many open files并减慢我的计算机速度。从ActivityMonitor来看,这个脚本似乎试图一次创建86000个转换子进程,每个进程都试图打开一个文件。也许这是pool.imap_unordered(convert_to_xml, preprints)的结果——也许我不需要将map与subprocess.Popen结合使用,因为我要调用的命令太多了?另一种选择是什么?你知道吗

我花了一整天的时间试图找出处理批量子流程的正确方法。我对Python的这一部分还不熟悉,所以如果您有任何正确方向的建议,我将不胜感激。谢谢!你知道吗


Tags: topath脚本formatmessageconverttimedef
1条回答
网友
1楼 · 发布于 2024-04-20 08:29:07

convert_to_xml中,process = subprocess.Popen(...)语句生成latexml子进程。 如果没有诸如process.communicate()之类的阻塞调用,convert_to_xml结束,即使latexml继续在后台运行。你知道吗

由于convert_to_xml结束,池向关联的工作进程发送另一个要运行的任务,因此convert_to_xml再次被调用。 再次在后台生成另一个latexml进程。 很快,在latexml进程中,您的注意力就到了,打开的文件数量的资源限制也达到了。你知道吗

解决方法很简单:添加process.communicate()告诉convert_to_xml等待latexml进程完成。你知道吗

try:
    process = subprocess.Popen(['latexml', ' dest=' + outpath, inpath], 
                               stderr=subprocess.PIPE, 
                               stdout=subprocess.PIPE)
    process.communicate()                                   
except Exception as error:
    process.kill()
    message = "error: %s run(*%r, **%r)" % (e, args, kwargs)
    print(message)

else: # use else so that this won't run if there is an Exception
    message = '{}: Converted!'.format(inpath)
    print(message)

关于if __name__ == '__main__'

作为martineau pointed out,有一个warning in the multiprocessing docs 生成新进程的代码不应在模块的顶层调用。 相反,代码应该包含在if __name__ == '__main__'语句中。你知道吗

在Linux中,如果忽略这个警告,就不会发生什么可怕的事情。 但在Windows中,代码“fork-bombs”。或者更准确地说,代码 因为在Windows上fork是通过生成一个新的Python进程来模拟的,该进程随后导入调用脚本。每次导入都会生成一个新的Python进程。每个Python进程都尝试导入调用脚本。直到消耗完所有资源,循环才被打破。你知道吗

所以为了善待我们的Windows fork-bereft兄弟,使用

if __name__ == '__main__:
    start()

有时进程需要大量内存。The only reliable way释放内存就是终止进程。maxtasksperchild=1告诉pool在每个工作进程完成1个任务后终止它。然后它生成一个新的工作进程来处理另一个任务(如果有的话)。这将释放原始辅助进程可能已分配的(内存)资源,这些资源本来无法释放。你知道吗

在您的情况下,工作进程看起来不需要太多内存,因此您可能不需要maxtasksperchild=1。 在convert_to_xml中,process = subprocess.Popen(...)语句生成latexml子进程。 如果没有诸如process.communicate()之类的阻塞调用,convert_to_xml结束,即使latexml继续在后台运行。你知道吗

由于convert_to_xml结束,池向关联的工作进程发送另一个要运行的任务,因此convert_to_xml再次被调用。 再次在后台生成另一个latexml进程。 很快,在latexml进程中,您的注意力就到了,打开的文件数量的资源限制也达到了。你知道吗

解决方法很简单:添加process.communicate()告诉convert_to_xml等待latexml进程完成。你知道吗

try:
    process = subprocess.Popen(['latexml', ' dest=' + outpath, inpath], 
                               stderr=subprocess.PIPE, 
                               stdout=subprocess.PIPE)
    process.communicate()                                   
except Exception as error:
    process.kill()
    message = "error: %s run(*%r, **%r)" % (e, args, kwargs)
    print(message)

else: # use else so that this won't run if there is an Exception
    message = '{}: Converted!'.format(inpath)
    print(message)

chunksize影响工作进程在将结果发送回主进程之前执行的任务数。 Sometimes这会影响性能,特别是当进程间通信是整个运行时的重要部分时。你知道吗

在您的情况下,convert_to_xml需要相对较长的时间(假设我们等到latexml完成),它只返回None。因此进程间通信可能不是整个运行时的重要部分。因此,我不希望您在这种情况下发现性能上的显著变化(尽管进行实验不会有什么坏处!)。你知道吗


在普通Python中,map不应仅用于多次调用函数。你知道吗

出于类似的风格原因,我会保留使用pool.*map*方法来处理我关心返回值的情况。你知道吗

所以

for _ in pool.imap_unordered(convert_to_xml, preprints, chunksize=5): 
    pass

你可以考虑使用

for preprint in preprints:
    pool.apply_async(convert_to_xml, args=(preprint, ))

相反。你知道吗


传递给任何pool.*map*函数的iterable被消耗 立即。iterable是否是迭代器并不重要。根本没有 在这里使用迭代器有特殊的内存优势。imap_unordered返回an 迭代器,但它不处理任何特别友好的迭代器的输入 是的。你知道吗

无论传递哪种类型的iterable,在调用pool.*map*函数时,iterable都是 消耗并转化为放入int的任务o任务队列。你知道吗

以下代码证实了这一说法:

版本1.py:

import multiprocessing as mp
import time

def foo(x):
    time.sleep(0.1)
    return x * x


def gen():
    for x in range(1000):
        if x % 100 == 0:
            print('Got here')
        yield x


def start():
    pool = mp.Pool()
    for item in pool.imap_unordered(foo, gen()):
        pass

    pool.close()
    pool.join()

if __name__ == '__main__':
    start()

版本2.py:

import multiprocessing as mp
import time
def foo(x):
    time.sleep(0.1)
    return x * x


def gen():
    for x in range(1000):
        if x % 100 == 0:
            print('Got here')
        yield x


def start():
    pool = mp.Pool()

    for item in gen():
        result = pool.apply_async(foo, args=(item, ))

    pool.close()
    pool.join()

if __name__ == '__main__':
    start()

运行version1.pyversion2.py都会产生相同的结果。你知道吗

Got here
Got here
Got here
Got here
Got here
Got here
Got here
Got here
Got here
Got here

最关键的是,您会注意到Got here以最快的速度打印了10次 开始运行,然后有一个长时间的暂停(同时计算) 在程序结束之前。你知道吗

如果发生器gen()pool.imap_unordered缓慢消耗, 我们应该期望Got here的打印速度也很慢。因为Got here是 快速打印10次,我们可以看到iterablegen()正在 在任务完成之前完全消耗掉。你知道吗

运行这些程序会给你信心 pool.imap_unorderedpool.apply_async正在将任务放入队列 基本上是一样的:打电话后马上。你知道吗

相关问题 更多 >