我正处于重写我的科学分析增量构建框架的早期阶段,以使用难以置信的Dask——这里是Dask.distributed。依赖关系图变得复杂,有些任务运行很长时间(小时、天),而另一些任务运行不了(分钟)。每个任务将其结果与用户生成的密钥一起存储到磁盘(相对而言,pure=True)。 在处理异常时,我希望允许已开始处理且未处于错误状态的任何任务完成,以便如果这些任务是长时间运行的任务,它们的工作不会丢失
我不喜欢为了避免内存消耗而保留未来。因此,一种可能是从调度程序任务状态检索密钥,并通过调用Future(key)重新创建未来,然后使用as_completed来允许这些任务完成。对于某些类型的异常——例如断开的连接——我可能会尝试修复并重新提交错误的任务,并保持整个shebang运行——但可能性不大
奇怪的是,在伟大的文档和教程中,我没有遇到过以这种方式重新创建未来obj的参考。 问题:
一个“有效”的玩具示例——在这里,带有cellc、celld和celle的子图可以在图的其余部分出错后完成。一个主要的区别是,我会为每个任务生成并提交密钥
from dask.distributed import Client, Future
client = Client()
def cella(i, j):
return i * j
def cellb(i, j):
sleep(10)
return i + j
def cellc(i, j):
sleep(10)
return i - j
def celld(x):
return 10/x
def celle(y):
return 10/y
def cellf(z):
return 10/z
def run_top(x, y, z):
futf = client.submit(cellf, z)
fute = client.submit(celle, y)
futd = client.submit(celld, x)
futc = client.submit(cellc, futd, fute)
futb = client.submit(cellb, futd, futf)
futa = client.submit(cella, futb, futc)
try:
result = futa.result()
except ZeroDivisionError as exc:
# potentially fix and resubmit error task identified in exc
ix = [Future(key) for key,v in client.cluster.scheduler.tasks.items() if v.state == 'processing']
for fut in distributed.as_completed(ix):
print(fut,fut.result())
raise exc
else:
return result
r = run_top(1,2,0)
<;未来:成品,类型:内置。浮动,键:cellc-339ad1ba637073781b981b964aca2337>;5.0
ZeroDivisionError回溯(最近一次调用上次)
目前没有回答
相关问题 更多 >
编程相关推荐