Python多进程在Nuke中导致Nuke卡住

2 投票
2 回答
1503 浏览
提问于 2025-04-18 18:44

我有一段代码,运行后导致Nuke软件卡住。简单来说,我想从文件系统中获取文件和文件夹的列表,并尝试通过并行处理来加快这个过程。在Nuke以外的环境中,这个方法运行得很好,但在Nuke中执行时就会导致软件卡住。有没有更好的方法可以避免Nuke卡住呢?我希望能通过Python的标准库或者一些不依赖于特定平台的包来解决这个问题。如果实在没有办法,那也没关系。最坏的情况就是我得放弃并行处理,寻找其他优化的方法。

另外,当我在Nuke中运行这段代码时,控制台会出现以下错误:

Unknown units in -c from multiprocessing.forking import main; main()

代码如下:

#!/bin/env python

import multiprocessing
import os

CPU_COUNT = multiprocessing.cpu_count()


def _threaded_master(root):
    in_queue = multiprocessing.JoinableQueue()
    folder_queue = multiprocessing.JoinableQueue()
    file_queue = multiprocessing.JoinableQueue()

    in_queue.put(root)

    for _ in xrange(CPU_COUNT):
        multiprocessing.Process(target=_threaded_slave, args=(in_queue, folder_queue, file_queue)).start()

    in_queue.join()

    return {"folders": folder_queue, "files": file_queue}


def _threaded_slave(in_queue, folder_queue, file_queue):
    while True:
        path_item = in_queue.get()

        if os.path.isdir(path_item):
            for item in os.listdir(path_item):
                path = os.path.join(path_item, item)
                in_queue.put(path)

        in_queue.task_done()


if __name__ == "__main__":
    print _threaded_master(r"/path/to/root")

2 个回答

1

这里有一个链接可以参考: https://learn.foundry.com/nuke/developers/63/pythondevguide/threading.html

值得注意的是,里面提到的警告: nuke.executeInMainThreadnuke.executeInMainThreadWithResult 应该始终在子线程中运行。如果在主线程中运行,它们会导致NUKE卡住。

所以,创建一个新的子线程,在那里进行你的操作。

2

这是我用多个线程扫描一个大目录树的代码。

我最开始是用传统的 multiprocessing.Pool() 来写这个代码,因为它非常简单,而且能直接得到函数的结果。这样就不需要输入和输出队列了。另一个不同之处是它使用的是 进程 而不是 线程,这两者各有利弊。

不过,Pool 有一个大缺点:它假设你有一个固定的待处理项目列表。

所以,我根据你们的原始示例重写了代码:使用输入/输出队列来处理目录,还有一个输出队列。调用者需要明确地从输出队列中获取项目。

为了好玩,我还和传统的 os.walk() 做了个时间对比……至少在我的机器上,传统的解决方案更快。两个方案处理的文件数量差别很大,我也说不清楚为什么。

祝你玩得开心!

源代码

#!/bin/env python

import multiprocessing, threading, time
import logging, os, Queue, sys

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)-4s %(levelname)s %(threadName)s %(message)s", 
    datefmt="%H:%M:%S",
    stream=sys.stderr,
)

def scan_dir(topdir):
    try:
        for name in os.listdir(topdir):
            path = os.path.join(topdir, name)
            yield (path, os.path.isdir(path))
    except OSError:
        logging.error('uhoh: %s', topdir)

def scan_dir_queue(inqueue, outqueue):
    logging.info('start')
    while True:
        try:
            dir_item = inqueue.get_nowait()
        except Queue.Empty:
            break

        res = list( scan_dir(dir_item) )
        logging.debug('- %d paths', len(res))
        for path,isdir in res:
            outqueue.put( (path,isdir) )
            if isdir:
                inqueue.put(path)
    logging.info('done')

def thread_master(root):
    dir_queue = Queue.Queue() # pylint: disable=E1101
    dir_queue.put(root)
    result_queue = Queue.Queue()

    threads = [
        threading.Thread(
            target=scan_dir_queue, args=[dir_queue, result_queue]
        )
        for _ in range(multiprocessing.cpu_count())
    ]

    for th in threads:
        th.start()
    for th in threads:
        th.join()
    return result_queue.queue

if __name__ == "__main__":
    topdir = os.path.expanduser('~')

    start = time.time()
    res = thread_master(topdir)
    print 'threaded:', time.time() - start
    print len(res), 'paths'

    def mywalk(topdir):
        for (dirpath, _dirnames, filenames) in os.walk(topdir):
            for name in filenames:
                yield os.path.join(dirpath, name)
    start = time.time()
    res = list(mywalk(topdir))
    print 'os.walk:', time.time() - start
    print len(res), 'paths'

输出

11:56:35 INFO Thread-1 start
11:56:35 INFO Thread-2 start
11:56:35 INFO Thread-3 start
11:56:35 INFO Thread-4 start
11:56:35 INFO Thread-2 done
11:56:35 INFO Thread-3 done
11:56:35 INFO Thread-4 done
11:56:42 INFO Thread-1 done
threaded: 6.49218010902
299230 paths
os.walk: 1.6940600872
175741 paths

撰写回答