Python多进程与共享库及内存持久化
我正在用Python来驱动运行成千上万的数值模拟。因为每次模拟的初始化过程都是一样的,所以我可以通过只进行一次(耗时的)初始化,来节省时间。然后把初始状态的向量保存在内存中,接下来每次模拟只需恢复这些向量。
下面是一个简单的示例:
from ctypes import cdll
try:
lib = cdll.LoadLibrary('shared_lib.so')
print lib.set_to_backup() # this tells the program to save the initial state
print lib.simulate("cmd.txt") # initializes, backs up state, saves an internal variable -backed_up- to true and simulates
print lib.simulate("cmd2.txt") # sees the internal state -backed_up- equal to true and skips initialisation, restores from memory and simulates
except:
import traceback
traceback.print_exc()
这个方法运行得很好,我可以进行多个模拟(cmd1, cmd2, ...)而不需要重新初始化。
现在,我想用多进程来加速这个过程。也就是说,每个进程应该只加载一次库,然后运行第一次模拟,进行初始化、保存和模拟。之后的每次模拟应该重新加载初始状态并进行模拟。下面是一个单个进程的示例:
from ctypes import cdll
from multiprocessing import Process
try:
lib = cdll.LoadLibrary('shared_lib.so')
print lib.set_to_backup() # this tells the program to save the initial state
p1 = Process(target=lib.simulate, args=("cmd.txt",)) # initializes, backs up state, saves an internal variable -backed_up- to true and simulates
p1.start()
p1.join()
print p1.exitcode
p2 = Process(target=lib.simulate, args=("cmd2.txt",)) # (should) see the internal state -backed_up- equal to true and skips initialisation, restores from memory and simulates
p2.start()
p2.join()
print p2.exitcode
except:
import traceback
traceback.print_exc()
第一个进程工作得很好(我在跟踪中可以看到)。但是第二个进程在库中看不到-backed_up-这个变量,结果重新初始化了所有内容。
我尝试过不声明新进程,而是简单地重新运行p1.start()来重启同一个进程,但这失败了(assert self._popen is None, '不能两次启动一个进程')。
-backed_up-是库中的一个全局变量,应该在调用lib.simulate之间保持在内存中(就像第一个示例中那样)。
我使用的是Linux Debian 7,Python版本是2.7.3。
有没有人知道怎么才能让这个工作呢?
1 个回答
1
我成功地用一个队列让它工作了。这个回答受到了很大的启发,来源于这个链接 --> https://stackoverflow.com/a/6672593/801468
import multiprocessing
from ctypes import cdll
num_procs = 2
def worker():
lib = cdll.LoadLibrary('shared_lib.so')
print lib.set_to_backup()
for DST in iter( q.get, None ):
print 'treating: ', DST
print lib.simulate(DST)
q.task_done()
q.task_done()
q = multiprocessing.JoinableQueue()
procs = []
for i in range(num_procs):
procs.append( multiprocessing.Process(target=worker) )
procs[-1].daemon = True
procs[-1].start()
list_of_DST = ["cmd1.txt", "cmd2.txt"]
for DST in list_of_DST:
q.put(DST)
q.join()
for p in procs:
q.put( None )
q.join()
for p in procs:
p.join()
print "Finished everything...."
print "Active children:", multiprocessing.active_children()