如何允许处理dask期货在异常后完成?

2024-06-16 11:50:16 发布

您现在位置:Python中文网/ 问答频道 /正文

我正处于重写我的科学分析增量构建框架的早期阶段,以使用难以置信的Dask——这里是Dask.distributed。依赖关系图变得复杂,有些任务运行很长时间(小时、天),而另一些任务运行不了(分钟)。每个任务将其结果与用户生成的密钥一起存储到磁盘(相对而言,pure=True)。 在处理异常时,我希望允许已开始处理且未处于错误状态的任何任务完成,以便如果这些任务是长时间运行的任务,它们的工作不会丢失

我不喜欢为了避免内存消耗而保留未来。因此,一种可能是从调度程序任务状态检索密钥,并通过调用Future(key)重新创建未来,然后使用as_completed来允许这些任务完成。对于某些类型的异常——例如断开的连接——我可能会尝试修复并重新提交错误的任务,并保持整个shebang运行——但可能性不大

奇怪的是,在伟大的文档和教程中,我没有遇到过以这种方式重新创建未来obj的参考。 问题:

  1. 如果从键重新创建未来的OBJ,有没有理由不这样做或避免陷阱
  2. 有没有更好的方法来完成这些任务?我知道我的用例有点不寻常,因为我将“中间”任务结果存储到磁盘上——否则,如果较大的图形出错,可能没有什么理由让任务完成

一个“有效”的玩具示例——在这里,带有cellc、celld和celle的子图可以在图的其余部分出错后完成。一个主要的区别是,我会为每个任务生成并提交密钥

from dask.distributed import Client, Future
client = Client()

def cella(i, j):
    return i * j

def cellb(i, j):
    sleep(10)
    return i + j

def cellc(i, j):
    sleep(10)
    return i - j

def celld(x):
    return 10/x

def celle(y):
    return 10/y

def cellf(z):
    return 10/z


def run_top(x, y, z):
    
    futf = client.submit(cellf, z)
    fute = client.submit(celle, y)
    futd = client.submit(celld, x)
    futc = client.submit(cellc, futd, fute)
    futb = client.submit(cellb, futd, futf)
    futa = client.submit(cella, futb, futc)
    
    try:
        result = futa.result()
        
    except ZeroDivisionError as exc:
        # potentially fix and resubmit error task identified in exc

        ix = [Future(key) for key,v in client.cluster.scheduler.tasks.items() if v.state == 'processing']
        
        for fut in distributed.as_completed(ix):
            print(fut,fut.result())
            
        raise exc
    else:
        return result

r = run_top(1,2,0)

<;未来:成品,类型:内置。浮动,键:cellc-339ad1ba637073781b981b964aca2337>;5.0


ZeroDivisionError回溯(最近一次调用上次)


Tags: keyclientreturndefas密钥futureresult