报告Dask tas的进展

2024-04-25 22:50:07 发布

您现在位置:Python中文网/ 问答频道 /正文

我在Dask调度程序上运行了很多慢任务,我需要每个任务的进度报告。任务将从处理进度报告的同一台机器上提交,因此可以保留在同一个进程中,但现在让我们假设任务是在不同的进程中提交的,进度报告是在不同的进程中处理的。你知道吗

Dask提供了Coordination Primitives,其预期用例包括能够监视进度:

These can be used to control access to external resources, track progress of ongoing computations, or share data in side-channels between many workers, clients, and tasks sensibly.

我能想到的最简单的例子就是:

任务提交者:

from dask.distributed import Client, Pub
import time

c = Client('tcp://127.0.0.1:8786')

def slow_func():
    q = Pub('progress')
    for i in range(10):
        q.put(f'{i}/10')
        time.sleep(1)

c.submit(slow_func)

任务报告者:

from dask.distributed import Client, Sub

c = Client('tcp://127.0.0.1:8786')
q = Sub('progress')
while True:
    print(q.get())

这适用于Pub/Sub,但也同样适用于Queue。现在,尽管它确实有效,但似乎并不是作者所想的:

  • 我最终隐式地依赖于在执行报告时用于提交任务的Client;即Client最终在工作节点上。这感觉很奇怪。你知道吗
  • 我无法区分不同的任务;理想情况下,我可以在报告中使用未来之钥之类的东西。你知道吗

因此,我的问题是,无可否认有些含糊:至于创建一个Dask future提供进度报告的“Hello world”样式的示例,我将如何修改上述内容,使之成为可以被视为惯用的Dask,是否有任何缺陷需要注意?你知道吗

我可以通过为每个任务创建一个新的客户机(下面的示例)来部分地解决我的第一个问题,但是由于我最终得到的结果似乎是一样的,所以这样做也许是不必要的。你知道吗

import time
from dask.distributed import Client, Pub

c_submit = Client('tcp://127.0.0.1:8786')

def slow_func():
    c_report = Client('tcp://127.0.0.1:8786')
    q = Pub('progress', client=c_report)
    for i in range(10):
        q.put(f'{i}/10')
        time.sleep(1)

c_submit.submit(slow_func)

Tags: infromimportclienttime进程报告dask
1条回答
网友
1楼 · 发布于 2024-04-25 22:50:07

问题的第一部分由^{}的存在来回答,它正是我们所需要的:提供一个与当前工作进程的调度器对话的客户机。这样,任务提交者就变成了:

import time
from dask.distributed import Client, Pub, worker_client

c_submit = Client('tcp://127.0.0.1:8786')

def slow_func():
    with worker_client() as c_report:
        q = Pub('progress', client=c_report)
        for i in range(10):
            q.put(f'{i}/10')
            time.sleep(1)
c_submit.submit(slow_func)

对于第二部分,一个不可怕的方法是每次提交任务时简单地生成一个ID。也就是说,这样做:

import time
import uuid

from dask.distributed import Client, Pub, worker_client

c_submit = Client('tcp://127.0.0.1:8786')

def slow_func(task_id):
    with worker_client() as c_report:
        q = Pub('progress', client=c_report)
        for i in range(10):
            q.put(f'{task_id}: {i}/10')
            time.sleep(1)

c_submit.submit(slow_func, uuid.uuid4())

这可以解决我的问题,但是当未来的钥匙里已经有了一个完全可用的ID时,使用一个新的ID仍然感觉有点奇怪。你知道吗

相关问题 更多 >

    热门问题