如何在Python中并行运行os.walk?

2024-04-29 11:08:31 发布

您现在位置:Python中文网/ 问答频道 /正文

我用Java编写了一个简单的应用程序,它获取一个路径列表,并生成一个包含原始列表下所有文件路径的文件。

如果paths.txt有:

c:\folder1\
c:\folder2\
...
...
c:\folder1000\

我的应用程序在每个多线程路径上运行递归函数,并返回一个包含这些文件夹下所有文件路径的文件。

现在我想用Python编写这个应用程序。

我编写了一个简单的应用程序,它使用os.walk()来运行给定的文件夹并打印要输出的文件路径。

现在我想并行运行它,我已经看到Python有一些模块用于此: 多线程和多处理。

最好的办法是什么?在这种方式下,它是如何执行的?


Tags: 模块文件路径txt文件夹应用程序列表os
3条回答

这是python中线程的模式,对我很有用。不过,我不确定线程是否会因为线程在CPython中的工作方式而提高性能。

import threading
import Queue
import os

class PathThread (threading.Thread):
    def __init__(self, queue):
        threading.Thread.__init__(self)
        self.queue = queue

    def printfiles(self, p):
        for path, dirs, files in os.walk(p):
            for f in files:
                print path + "/" + f

    def run(self):
        while True:
            path = self.queue.get()
            self.printfiles(path)
            self.queue.task_done()

# threadsafe queue
pathqueue = Queue.Queue()
paths = ["foo", "bar", "baz"]

# spawn threads
for i in range(0, 5):
    t = PathThread(pathqueue)
    t.setDaemon(True)
    t.start()

# add paths to queue
for path in paths:
    pathqueue.put(path)

# wait for queue to get empty
pathqueue.join()

这里有一个多处理解决方案:

from multiprocessing.pool import Pool
from multiprocessing import JoinableQueue as Queue
import os

def explore_path(path):
    directories = []
    nondirectories = []
    for filename in os.listdir(path):
        fullname = os.path.join(path, filename)
        if os.path.isdir(fullname):
            directories.append(fullname)
        else:
            nondirectories.append(filename)
    outputfile = path.replace(os.sep, '_') + '.txt'
    with open(outputfile, 'w') as f:
        for filename in nondirectories:
            print >> f, filename
    return directories

def parallel_worker():
    while True:
        path = unsearched.get()
        dirs = explore_path(path)
        for newdir in dirs:
            unsearched.put(newdir)
        unsearched.task_done()

# acquire the list of paths
with open('paths.txt') as f:
    paths = f.split()

unsearched = Queue()
for path in paths:
    unsearched.put(path)

pool = Pool(5)
for i in range(5):
    pool.apply_async(parallel_worker)

unsearched.join()
print 'Done'

即使是线程也可以非常有助于目录遍历。我使用下面的代码遍历一个SharePoint树,在大约50个线程中获得了相当显著的加速。 这个特定的程序返回目录结构中所有xml文件的(路径、数据)对,并且可以简单地展开以供使用。 (这是从我的程序中剪切粘贴的;需要进行一些额外的编辑。)

#unique string for error passing error messages
ERROR = '\xffERROR\xff'

class ScanWorker(threading.Thread):
    """Worker class for scanning directory structures.
    pathQueue: queue for pathnames of directories
    resultQueue: results of processFile, pairs of (path, data) to be updated
    """
    lock = threading.Lock()
    dirCount = 0
    def __init__(self, pathQueue, resultQueue):
        self.pathQueue = pathQueue
        self.resultQueue = resultQueue
        super().__init__()

    def run(self):
        """Worker thread.
        Get a directory, process it, and put new directories on the
        queue."""
        try:
            while True:
                self.processDir(self.pathQueue.get())
                self.pathQueue.task_done()
        except Exception as e:
            #pass on exception to main thread
            description = traceback.format_exception(*sys.exc_info())
            description.insert(0,
                "Error in thread {}:\n".format(
                    threading.current_thread().name))
            self.resultQueue.put((ERROR, description))
            self.pathQueue.task_done()

    def processDir(self, top):
        """Visit a directory
        Call self.processFile on every file, and queue the directories.
        """
        #Wait and retry a few times in case of network errors.
        #SharePoint is not reliable, gives errors for no reason
        for retryCount in range(30):
            try:
                names = listdir(top)
                break
            except OSError as e:
                if e.errno in (2,22):
                    lastError = e
                    print(end="L", flush=True)
                    time.sleep(1)
                else:
                    raise
        else:
            print("List: too many retries")
            raise lastError
        #it is not important to worry about race conditions here
        self.__class__.dirCount += 1
        #process contents
        for name in names:
            if isdir(join(top, name)): self.pathQueue.put(join(top, name))
            else: self.processFile(join(top, name))

    def processFile(self, path):
        """Get XML file.
        """
        #only xml files
        if not path.lower().endswith('.xml'): return
        filemtime = datetime.fromtimestamp(getmtime(path))
        #SharePoint is not reliable, gives errors for no reason; just retry
        for retryCount in range(30):
            try:
                data = open(path,'rb').read()
                break
            except OSError as e:
                if e.errno in (2,22):
                    lastError = e
                    print(end="R", flush=True)
                    time.sleep(1)
                else:
                    raise
        else:
            print("Read: too many retries")
            raise lastError
        self.resultQueue.put((path, data))

class Scanner:
    """Interface to the ScanWorkers
    Sharepoint is pretty fast compared to its delay and handles 50 workers well
    Make sure you only create one instance of Scanner!
    """
    def __init__(self, workers):
        #don't restrict the path queue length; this causes deadlock
        #we use a LIFO queue to get more depth-first like search
        #reducing average queue length and hopefully improving server caching
        self.pathQueue = queue.LifoQueue()
        #this is the output queue to the main thread
        self.resultQueue = queue.Queue(5)
        self.workers = workers
        #start workers
        for i in range(workers):
            t = ScanWorker(self.pathQueue, self.resultQueue)
            t.setDaemon(True)
            t.start()

    def startWorkers(self, path):
        #add counter
        self.added = 0
        #and go
        self.pathQueue.put(path)

    def processResult(self, wait=True):
        """Get an element from the result queue, and add to the zip file."""
        path, data = self.resultQueue.get(block=wait)
        if path==ERROR:
            #process gave alarm; stop scanning
            #pass on description
            raise ScanError(data)
        <do whatever you want to do with the file>
        self.resultQueue.task_done()
        self.added += 1

#main
try:
    #set up
    scanner = Scanner(threads)
    scanner.startWorkers(rootpath)
    pathQueue, resultQueue = scanner.pathQueue, scanner.resultQueue
    #scanner is rolling; wait for it to finish
    with pathQueue.all_tasks_done:
        while pathQueue.unfinished_tasks:
            #tasks are still running
            #process results
            while True:
                try: scanner.processResult(wait=False)
                except queue.Empty: break
            #no new files found; check if scanner is ready
            done = pathQueue.all_tasks_done.wait(timeout=1)
            if not done:
                #Not yet; print something while we wait
                print(
                    "\rProcessed {} files from {} directories [{} {}]  "
                    .format(
                        scanner.added,
                        ScanWorker.dirCount,
                        pathQueue.unfinished_tasks,
                        resultQueue.unfinished_tasks,
                    ), end='\r')
    #just to make sure everybody is ready: join the path queue
    pathQueue.join()
    #process remaining of result queue
    while resultQueue.unfinished_tasks: scanner.processResult(wait=True)
    #go to new line to prevent overwriting progress messages
    print()
except ScanError as e:
    print()
    print(*e.args[0], end='')
    print("Process interrupted.")
except KeyboardInterrupt:
    print("\nProcess interrupted.")
print()

相关问题 更多 >