在Python中动态加载模块(+多进程问题)
我正在写一个Python包,它可以从配置文件中读取模块列表和一些附加数据。
接下来,我想逐个处理这些动态加载的模块,并在每个模块中调用一个叫做do_work()的函数,这个函数会启动一个新进程,这样代码就可以在一个单独的进程中异步运行。
目前,我在主脚本的开头导入了所有已知模块的列表,我觉得这样做很糟糕,因为不够灵活,而且维护起来也很麻烦。
这是用来启动进程的函数。我想修改它,使得在遇到模块时能够动态加载。字典中的键是包含代码的模块名称:
def do_work(work_info):
for (worker, dataset) in work_info.items():
#import the module defined by variable worker here...
# [Edit] NOT using threads anymore, want to spawn processes asynchronously here...
#t = threading.Thread(target=worker.do_work, args=[dataset])
# I'll NOT dameonize since spawned children need to clean up on shutdown
# Since the threads will be holding resources
#t.daemon = True
#t.start()
问题 1
当我在脚本中调用这个函数时(如上所示),我遇到了以下错误:
AttributeError: 'str'对象没有 'do_work'属性
这很合理,因为字典的键是一个字符串(要导入的模块名称)。
当我在启动线程之前添加以下语句:
import worker
时,我得到了这个错误:
ImportError: 没有名为worker的模块
这很奇怪,因为我使用的是变量名而不是它所持有的值 - 当我打印这个变量时,我得到了预期的值,这到底是怎么回事?
问题 2
正如我在评论区提到的,我意识到在子进程中执行的do_work()函数需要自己清理。我理解的是,应该写一个clean_up函数,当do_work()成功完成或捕获到未处理的异常时调用 - 还有什么其他需要做的,以确保资源不会泄漏或让操作系统处于不稳定状态吗?
问题 3
如果我注释掉t.daemon标志的语句,代码还会异步运行吗?子进程执行的工作相当繁重,我不想等一个子进程完成后再启动另一个。顺便说一下,我知道Python中的线程实际上是一种时间共享/切片 - 这没问题。
最后,有没有更好的(更符合Python风格的)方法来做我想做的事情?
[编辑]
在多了解一些关于Python的GIL和线程(嗯,黑科技)的内容后,我觉得最好使用独立的进程(至少如果我理解正确的话,脚本可以利用可用的多个进程),所以我将会启动新的进程而不是线程。
我有一些用于启动进程的示例代码,但它有点简单(使用了lambda函数)。我想知道如何扩展它,以便能够处理在加载的模块中运行函数(就像我上面所做的那样)。
这是我现有代码的一部分:
def do_mp_bench():
q = mp.Queue() # Not only thread safe, but "process safe"
p1 = mp.Process(target=lambda: q.put(sum(range(10000000))))
p2 = mp.Process(target=lambda: q.put(sum(range(10000000))))
p1.start()
p2.start()
r1 = q.get()
r2 = q.get()
return r1 + r2
我该如何修改它,以处理一个模块字典,并在每个加载的模块中以新进程运行do_work()函数?
2 个回答
这段内容经过修改,使用了import()的相关文档,具体可以查看这里:import。同时也进行了重构,以便使用请求的多进程模块,相关文档可以在这里找到:multiprocessing。不过,这个还没有经过测试。
def do_work(work_info):
q = mp.Queue()
for (worker, dataset) in work_info.items():
xworker = __import__(worker)
p = mp.Process(target=xworker.do_work, args=dataset).start()
q.put(p)
while not q.empty():
r = q.get()