我在Dask调度程序上运行了很多慢任务,我需要每个任务的进度报告。任务将从处理进度报告的同一台机器上提交,因此可以保留在同一个进程中,但现在让我们假设任务是在不同的进程中提交的,进度报告是在不同的进程中处理的。你知道吗
Dask提供了Coordination Primitives,其预期用例包括能够监视进度:
These can be used to control access to external resources, track progress of ongoing computations, or share data in side-channels between many workers, clients, and tasks sensibly.
我能想到的最简单的例子就是:
任务提交者:
from dask.distributed import Client, Pub
import time
c = Client('tcp://127.0.0.1:8786')
def slow_func():
q = Pub('progress')
for i in range(10):
q.put(f'{i}/10')
time.sleep(1)
c.submit(slow_func)
任务报告者:
from dask.distributed import Client, Sub
c = Client('tcp://127.0.0.1:8786')
q = Sub('progress')
while True:
print(q.get())
这适用于Pub
/Sub
,但也同样适用于Queue
。现在,尽管它确实有效,但似乎并不是作者所想的:
Client
;即Client
最终在工作节点上。这感觉很奇怪。你知道吗因此,我的问题是,无可否认有些含糊:至于创建一个Dask future提供进度报告的“Hello world”样式的示例,我将如何修改上述内容,使之成为可以被视为惯用的Dask,是否有任何缺陷需要注意?你知道吗
我可以通过为每个任务创建一个新的客户机(下面的示例)来部分地解决我的第一个问题,但是由于我最终得到的结果似乎是一样的,所以这样做也许是不必要的。你知道吗
import time
from dask.distributed import Client, Pub
c_submit = Client('tcp://127.0.0.1:8786')
def slow_func():
c_report = Client('tcp://127.0.0.1:8786')
q = Pub('progress', client=c_report)
for i in range(10):
q.put(f'{i}/10')
time.sleep(1)
c_submit.submit(slow_func)
问题的第一部分由^{} 的存在来回答,它正是我们所需要的:提供一个与当前工作进程的调度器对话的客户机。这样,任务提交者就变成了:
对于第二部分,一个不可怕的方法是每次提交任务时简单地生成一个ID。也就是说,这样做:
这可以解决我的问题,但是当未来的钥匙里已经有了一个完全可用的ID时,使用一个新的ID仍然感觉有点奇怪。你知道吗
相关问题 更多 >
编程相关推荐