dask与tensorflow的相互作用
dask-tensorflow的Python项目详细描述
从DASK启动TensorFlow群集
示例
给定一个dask集群
fromdask.distributedimportClientclient=Client('scheduler-address:8786')
获取tensorflow集群,按名称指定组
fromdask_tensorflowimportstart_tensorflowtf_spec,dask_spec=start_tensorflow(client,ps=2,worker=4)>>>tf_spec{'worker':['192.168.1.100:2222','192.168.1.101:2222','192.168.1.102:2222','192.168.1.103:2222'],'ps':['192.168.1.104:2222','192.168.1.105:2222']}
这将在每个dask工作进程上创建一个tensorflow.train.Server,并设置 每个工作机上的数据传输队列。这些可以直接作为 ^工人的{tt2}$和tensorflow_queue属性。
更复杂的工作流程
通常,我们会设置长时间运行的dask任务来获取这些服务器和 参与一般的张量流计算。
fromdask.distributedimportworker_clientdefps_function(self):withworker_client()asc:tf_server=c.worker.tensorflow_servertf_server.join()ps_tasks=[client.submit(ps_function,workers=worker,pure=False)forworkerindask_spec['ps']]defworker_function(self):withworker_client()asc:tf_server=c.worker.tensorflow_server# ... use tensorflow as desired ...worker_tasks=[client.submit(worker_function,workers=worker,pure=False)forworkerindask_spec['worker']]
一种简单而灵活的方法是让这些功能在队列中阻塞。 向他们提供来自DASK阵列、数据帧等的数据。
defworker_function(self):withworker_client()asc:tf_server=c.worker.tensorflow_serverqueue=c.worker.tensorflow_queuewhilenotstopping_condition():batch=queue.get()# train with batch
然后将numpy和pandas数据帧块转储到这些队列中
fromdistributed.worker_clientimportget_workerdefdump_batch(batch):worker=get_worker()worker.tensorflow_queue.put(batch)importdask.dataframeasdddf=dd.read_csv('hdfs:///path/to/*.csv')# clean up dataframe as necessarypartitions=df.to_delayed()# delayed pandas dataframesclient.map(dump_batch,partitions)