在Python中动态加载模块(+多进程问题)

0 投票
2 回答
913 浏览
提问于 2025-04-15 23:22

我正在写一个Python包,它可以从配置文件中读取模块列表和一些附加数据。

接下来,我想逐个处理这些动态加载的模块,并在每个模块中调用一个叫做do_work()的函数,这个函数会启动一个新进程,这样代码就可以在一个单独的进程中异步运行。

目前,我在主脚本的开头导入了所有已知模块的列表,我觉得这样做很糟糕,因为不够灵活,而且维护起来也很麻烦。

这是用来启动进程的函数。我想修改它,使得在遇到模块时能够动态加载。字典中的键是包含代码的模块名称:

def do_work(work_info):
  for (worker, dataset) in work_info.items():
    #import the module defined by variable worker here...

    # [Edit] NOT using threads anymore, want to spawn processes asynchronously here...

    #t = threading.Thread(target=worker.do_work, args=[dataset])
    # I'll NOT dameonize since spawned children need to clean up on shutdown
    # Since the threads will be holding resources
    #t.daemon = True
    #t.start()

问题 1

当我在脚本中调用这个函数时(如上所示),我遇到了以下错误:

AttributeError: 'str'对象没有 'do_work'属性

这很合理,因为字典的键是一个字符串(要导入的模块名称)。

当我在启动线程之前添加以下语句:

import worker

时,我得到了这个错误:

ImportError: 没有名为worker的模块

这很奇怪,因为我使用的是变量名而不是它所持有的值 - 当我打印这个变量时,我得到了预期的值,这到底是怎么回事?

问题 2

正如我在评论区提到的,我意识到在子进程中执行的do_work()函数需要自己清理。我理解的是,应该写一个clean_up函数,当do_work()成功完成或捕获到未处理的异常时调用 - 还有什么其他需要做的,以确保资源不会泄漏或让操作系统处于不稳定状态吗?

问题 3

如果我注释掉t.daemon标志的语句,代码还会异步运行吗?子进程执行的工作相当繁重,我不想等一个子进程完成后再启动另一个。顺便说一下,我知道Python中的线程实际上是一种时间共享/切片 - 这没问题。

最后,有没有更好的(更符合Python风格的)方法来做我想做的事情?

[编辑]

在多了解一些关于Python的GIL和线程(嗯,黑科技)的内容后,我觉得最好使用独立的进程(至少如果我理解正确的话,脚本可以利用可用的多个进程),所以我将会启动新的进程而不是线程。

我有一些用于启动进程的示例代码,但它有点简单(使用了lambda函数)。我想知道如何扩展它,以便能够处理在加载的模块中运行函数(就像我上面所做的那样)。

这是我现有代码的一部分:

def do_mp_bench():
    q = mp.Queue() # Not only thread safe, but "process safe"
    p1 = mp.Process(target=lambda: q.put(sum(range(10000000))))
    p2 = mp.Process(target=lambda: q.put(sum(range(10000000)))) 
    p1.start()
    p2.start()
    r1 = q.get()
    r2 = q.get()
    return r1 + r2

我该如何修改它,以处理一个模块字典,并在每个加载的模块中以新进程运行do_work()函数?

2 个回答

2

问题1:使用 __import__()

问题2:为什么不在 do_work() 函数的最后进行清理呢?

问题3:如果我没记错的话,守护线程的意思就是程序不会自动等这个线程结束。

1

这段内容经过修改,使用了import()的相关文档,具体可以查看这里:import。同时也进行了重构,以便使用请求的多进程模块,相关文档可以在这里找到:multiprocessing。不过,这个还没有经过测试。

def do_work(work_info):
    q = mp.Queue()
    for (worker, dataset) in work_info.items():
      xworker = __import__(worker)
      p = mp.Process(target=xworker.do_work, args=dataset).start()
      q.put(p)
    while not q.empty():
      r = q.get()

撰写回答