我使用一组进程来加载数据,对它们执行一些操作,然后将结果保存到磁盘
我发布了这个脚本的main
。重要的方面是有一组mp.Process
负责加载,另一组Process
负责执行模拟,最后一组负责保存
但是,从我的日志中可以看出,只有这个过程的加载部分完成了。此外,当我查看ps
时,我看到我的大多数进程都列为defunct
。我认为这个问题与我使用join
的方式有关。基本上我使用了join
,因为我希望工人继续处理scatterer_queue
中的项目,直到没有剩余的项目可以加载
# `scatterer_queue` contains scatterer distributions to be passed to
# simulation.
scatterer_queue = mp.JoinableQueue(maxsize=50)
# `simulated_queue` contains arrays that have already been simulated.
simulated_queue = mp.JoinableQueue(maxsize=50)
filenames = _files_in_directory( ... )
# Create simulator. This will be copied by each `simulation_worker` process.
simulator = simulator.Simulator(...)
# Create `RecordWriter`. This will be used to write out examples.
writer = record_writer.RecordWriter( ... )
# Create loading workers.
loading_worker = mp.Process(target=_load_data, args=(filenames, scatterer_queue))
# Create simulation workers.
simulation_workers = []
for i in range(simulation_worker_count):
worker = mp.Process(
target= _simulate,
args=(simulator.simulate, scatterer_queue, simulated_queue,)
)
worker.name="simulation_worker_{}".format(i)
logging.debug("Instantiating simulation worker {}".format(worker.name))
worker.daemon = True
simulation_workers.append(worker)
num_saving_threads = 1
# Create saving workers.
saving_workers = []
for i in range(num_saving_threads):
worker = mp.Process(
target=_save,
args=(writer.save, simulated_queue,)
)
worker.daemon = True
saving_workers.append(worker)
### LAUNCH WORKERS ###
# Launch saving workers.
[worker.start() for worker in saving_workers]
# Launch simulation threads.
[worker.start() for worker in simulation_workers]
# Launch loading threads.
loading_worker.start()
logging.debug("BEFORE JOIN", flush=True)
time.sleep(5.)
scatterer_queue.join()
simulated_queue.join()
logging.debug("AFTER JOIN", flush=True)
目前没有回答
相关问题 更多 >
编程相关推荐