我使用Dask运行一个任务池,按照结果由as_completed
方法完成的顺序检索结果,并可能在每次返回时向池提交新任务:
# Initial set of jobs
futures = [client.submit(job.run_simulation) for job in jobs]
pool = as_completed(futures, with_results=True)
while True:
# Wait for a job to finish
f, result = next(pool)
# Exit condition
if result == 'STOP':
break
# Do processing and maybe submit more jobs
more_jobs = process_result(f, result)
more_futures = [client.submit(job.run_simulation) for job in more_jobs]
pool.update(more_futures)
我的问题是:我提交的函数job.run_simulation
有时会挂起很长一段时间,我想让这个函数超时—如果运行时间超过某个时间限制,就终止任务并继续运行。在
理想情况下,我希望执行类似client.submit(job.run_simulation, timeout=10)
的操作,如果任务运行时间超过超时时间,next(pool)
返回{
有什么办法可以让达斯克帮我暂停这样的工作吗?在
我目前所做的努力
我的第一反应是在job.run_simulation
函数本身独立于Dask来处理超时。我看到了两种类型的建议(例如here)用于泛型Python超时。在
1)使用两个线程,一个用于函数本身,另一个用于计时器。我的印象是这实际上行不通,因为你不能杀死线程。即使计时器用完,两个线程都必须在任务完成之前完成。在
2)使用两个独立的进程(使用multiprocessing
模块),一个用于函数,一个用于计时器。这是可行的,但由于我已经在Dask生成的守护程序子进程中,所以不允许创建新的子进程。在
第三种可能是将代码块移动到我使用subprocess.run
运行的单独脚本,并使用subprocess.run
内置的超时。我可以这样做,但这感觉像是最坏情况下的回退场景,因为它需要在子流程之间来回传递大量数据。在
所以感觉我必须在Dask级别完成超时。我的一个想法是在向Dask提交任务的同时创建一个计时器作为子进程。然后,如果计时器用完,请使用Client.cancel()
停止任务。这个计划的问题是Dask可能会在任务开始之前等待工人释放出来,我不希望计时器在任务实际运行之前运行。在
Client.cancel
如果函数已经启动,则无法停止该函数的运行。这些函数在线程池中运行,因此会遇到“无法停止线程”的限制。Dask worker只是Python进程,具有相同的能力和限制。在你说你不能在守护进程中使用进程。解决此问题的一个解决方案是通过以下方式之一更改使用流程的方式:
如果你使用达斯克。分布式在一台机器上,不要使用进程
multiprocessing-context
配置设置为"spawn"
,而不是fork或forkserver解决这个问题的干净方法是在函数
job.run_simulation
内部解决它。理想情况下,您将能够将此超时逻辑推送到该代码,并使其干净地提升。在相关问题 更多 >
编程相关推荐