多线程Python文件系统爬虫

4 投票
2 回答
1165 浏览
提问于 2025-04-16 10:36

我写了一个Python函数,这个函数可以根据给定的目录模式在文件系统中查找文件,并且可以在每一层执行一些可选的“操作”。然后我尝试使用多线程,因为有些文件存储在网络共享上,我想尽量减少输入输出的阻塞。我开始使用多进程的Pool类,因为这样最方便……(真的,线程没有Pool类吗?)我的函数会尽可能地解析提供的文件系统模式,并将新找到的路径提交到池中,直到没有新的路径返回。我在直接使用这个函数和类时效果很好,但现在我想从另一个类中使用这个函数,结果我的程序似乎卡住了。为了简化,我用线程重写了这个函数,而不是进程,甚至还写了一个简单的线程池类……结果还是同样的问题。以下是一个非常简化的代码版本,但仍然出现相同的问题:

file test1.py:
------------------------------------------------

import os
import glob
from multiprocessing import Pool

def mapGlob(pool,paths,pattern):
    results = []
    paths = [os.path.join(p,pattern) for p in paths]
    for result in pool.map(glob.glob,paths):
        results += result
    return results

def findAllMyPaths():
    pool = Pool(10)
    paths = ['/Volumes']
    follow = ['**','ptid_*','expid_*','slkid_*']
    for pattern in follow:
        paths = mapGlob(pool,paths,pattern)
    return paths


file test2.py:
----------------------------------------------------------------------------

from test1 import findAllMyPaths

allmypaths = findAllMyPaths()

现在如果我调用

>>>from test1 import findAllMyPaths
>>>findAllMyPaths()
>>>...long list of all the paths

这个运行得很好,但如果我尝试:

>>>from test2 import allmypaths

Python就会一直卡住。这个操作函数(在这个例子中是glob)被调用了,但似乎从来没有返回……我需要帮助……并行版本在正常工作时运行得快得多(根据在文件系统树的每个点上映射的“操作”,速度快6到20倍),所以我希望能使用它。

另外,如果我把映射函数改成非并行版本:

def mapGlob(pool,paths,pattern):
    results = []
    paths = [os.path.join(p,pattern) for p in paths]
    for path in paths:
        results += glob.glob(path)
    return results

一切都运行得很好。

编辑:

我在多进程中开启了调试,看看能否进一步帮助我。在正常工作的情况下,我得到了:

[DEBUG/MainProcess] created semlock with handle 5
[DEBUG/MainProcess] created semlock with handle 6
[DEBUG/MainProcess] created semlock with handle 9
[DEBUG/MainProcess] created semlock with handle 10
[INFO/PoolWorker-1] child process calling self.run()
[INFO/PoolWorker-2] child process calling self.run()
[INFO/PoolWorker-3] child process calling self.run()
[INFO/PoolWorker-5] child process calling self.run()
[INFO/PoolWorker-4] child process calling self.run()
[INFO/PoolWorker-6] child process calling self.run()
[INFO/PoolWorker-7] child process calling self.run()
[INFO/PoolWorker-9] child process calling self.run()
[INFO/PoolWorker-8] child process calling self.run()
[INFO/PoolWorker-10] child process calling self.run()
[DEBUG/MainProcess] closing pool
[SUBDEBUG/MainProcess] finalizer calling <bound method type._terminate_pool of <class 'multiprocessing.pool.Pool'>> with args (<Queue.Queue instance at 0x34af918>, <multiprocessing.queues.SimpleQueue object at 0x3494950>, <multiprocessing.queues.SimpleQueue object at 0x34a61b0>, [<Process(PoolWorker-1, started daemon)>, <Process(PoolWorker-2, started daemon)>, <Process(PoolWorker-3, started daemon)>, <Process(PoolWorker-4, started daemon)>, <Process(PoolWorker-5, started daemon)>, <Process(PoolWorker-6, started daemon)>, <Process(PoolWorker-7, started daemon)>, <Process(PoolWorker-8, started daemon)>, <Process(PoolWorker-9, started daemon)>, <Process(PoolWorker-10, started daemon)>], <Thread(Thread-1, started daemon -1341648896)>, <Thread(Thread-2, started daemon -1341116416)>, {}) and kwargs {}
[DEBUG/MainProcess] finalizing pool
[DEBUG/MainProcess] helping task handler/workers to finish
[DEBUG/MainProcess] removing tasks from inqueue until task handler finished
[DEBUG/MainProcess] task handler got sentinel
[DEBUG/MainProcess] task handler sending sentinel to result handler
[DEBUG/MainProcess] task handler sending sentinel to workers
[DEBUG/MainProcess] task handler exiting
[DEBUG/MainProcess] result handler got sentinel
[DEBUG/MainProcess] ensuring that outqueue is not full
[DEBUG/MainProcess] result handler exiting: len(cache)=0, thread._state=0
[DEBUG/PoolWorker-2] worker got sentinel -- exiting
[DEBUG/PoolWorker-1] worker got sentinel -- exiting
[INFO/PoolWorker-2] process shutting down
[DEBUG/PoolWorker-7] worker got sentinel -- exiting
[INFO/PoolWorker-1] process shutting down
[INFO/PoolWorker-7] process shutting down
[DEBUG/PoolWorker-7] running all "atexit" finalizers with priority >= 0
[DEBUG/PoolWorker-1] running all "atexit" finalizers with priority >= 0
[DEBUG/PoolWorker-7] running the remaining "atexit" finalizers
[INFO/PoolWorker-7] process exiting with exitcode 0
[DEBUG/PoolWorker-1] running the remaining "atexit" finalizers
[INFO/PoolWorker-1] process exiting with exitcode 0
[DEBUG/PoolWorker-5] worker got sentinel -- exiting
[DEBUG/PoolWorker-2] running all "atexit" finalizers with priority >= 0
[INFO/PoolWorker-5] process shutting down
[DEBUG/PoolWorker-5] running all "atexit" finalizers with priority >= 0
[DEBUG/PoolWorker-2] running the remaining "atexit" finalizers
[DEBUG/PoolWorker-5] running the remaining "atexit" finalizers
[INFO/PoolWorker-2] process exiting with exitcode 0
[INFO/PoolWorker-5] process exiting with exitcode 0
[DEBUG/PoolWorker-6] worker got sentinel -- exiting
[INFO/PoolWorker-6] process shutting down
[DEBUG/PoolWorker-6] running all "atexit" finalizers with priority >= 0
[DEBUG/PoolWorker-6] running the remaining "atexit" finalizers
[INFO/PoolWorker-6] process exiting with exitcode 0
[DEBUG/PoolWorker-4] worker got sentinel -- exiting
[DEBUG/PoolWorker-9] worker got sentinel -- exiting
[INFO/PoolWorker-9] process shutting down
[DEBUG/PoolWorker-9] running all "atexit" finalizers with priority >= 0
[DEBUG/PoolWorker-9] running the remaining "atexit" finalizers
[INFO/PoolWorker-9] process exiting with exitcode 0
[INFO/PoolWorker-4] process shutting down
[DEBUG/PoolWorker-4] running all "atexit" finalizers with priority >= 0
[DEBUG/PoolWorker-4] running the remaining "atexit" finalizers
[INFO/PoolWorker-4] process exiting with exitcode 0
[DEBUG/PoolWorker-10] worker got sentinel -- exiting
[INFO/PoolWorker-10] process shutting down
[DEBUG/PoolWorker-10] running all "atexit" finalizers with priority >= 0
[DEBUG/PoolWorker-10] running the remaining "atexit" finalizers
[INFO/PoolWorker-10] process exiting with exitcode 0
[DEBUG/PoolWorker-8] worker got sentinel -- exiting
[INFO/PoolWorker-8] process shutting down
[DEBUG/PoolWorker-8] running all "atexit" finalizers with priority >= 0
[DEBUG/PoolWorker-8] running the remaining "atexit" finalizers
[INFO/PoolWorker-8] process exiting with exitcode 0
[DEBUG/PoolWorker-3] worker got sentinel -- exiting
[INFO/PoolWorker-3] process shutting down
[DEBUG/PoolWorker-3] running all "atexit" finalizers with priority >= 0
[DEBUG/PoolWorker-3] running the remaining "atexit" finalizers
[INFO/PoolWorker-3] process exiting with exitcode 0
[DEBUG/MainProcess] terminating workers
[DEBUG/MainProcess] joining task handler
[DEBUG/MainProcess] joining result handler
[DEBUG/MainProcess] joining pool workers

而当它不工作时,我得到的只是:

[DEBUG/MainProcess] created semlock with handle 6
[DEBUG/MainProcess] created semlock with handle 7
[DEBUG/MainProcess] created semlock with handle 10
[DEBUG/MainProcess] created semlock with handle 11
[INFO/PoolWorker-1] child process calling self.run()
[INFO/PoolWorker-2] child process calling self.run()
[INFO/PoolWorker-3] child process calling self.run()
[INFO/PoolWorker-8] child process calling self.run()
[INFO/PoolWorker-5] child process calling self.run()
[INFO/PoolWorker-4] child process calling self.run()
[INFO/PoolWorker-9] child process calling self.run()
[INFO/PoolWorker-6] child process calling self.run()
[INFO/PoolWorker-7] child process calling self.run()
[INFO/PoolWorker-10] child process calling self.run()

2 个回答

0

如果我没记错的话,test2.py应该是长这个样子的

from test1 import findAllMyPaths
allmypaths = findAllMyPaths

然后

from test2 import allmypaths  
allmypaths()
1

这不是一个完整的解决方案,但我找到了一种方法,可以让代码在两种情况下都能正常工作:在解释器中运行或者作为正在运行的脚本中的代码。我觉得问题和多进程文档中的以下说明有关:

这个包里的功能要求方法能够被子进程导入。这在编程指南中有提到,但在这里指出一下也很重要。这意味着一些例子,比如multiprocessing.Pool的例子,在交互式解释器中是无法工作的。

我不太明白为什么会有这样的限制,以及为什么有时候我可以在交互式解释器中使用池,有时候又不行,不过算了……

为了绕过这个问题,我在任何可能使用多进程的模块中这样做:

import __main__
__SHOULD_MULTITHREAD__ = False
if hasattr(__main__,'__file__'):
    __SHOULD_MULTITHREAD__ = True

然后该模块中的其他代码可以检查这个标志,看看是应该使用池还是直接执行而不进行并行处理。这样一来,我仍然可以在交互式解释器中使用和测试并行化的函数,只是它们运行得会慢很多。

撰写回答