dask与tensorflow的相互作用

dask-tensorflow的Python项目详细描述


从DASK启动TensorFlow群集

示例

给定一个dask集群

fromdask.distributedimportClientclient=Client('scheduler-address:8786')

获取tensorflow集群,按名称指定组

fromdask_tensorflowimportstart_tensorflowtf_spec,dask_spec=start_tensorflow(client,ps=2,worker=4)>>>tf_spec{'worker':['192.168.1.100:2222','192.168.1.101:2222','192.168.1.102:2222','192.168.1.103:2222'],'ps':['192.168.1.104:2222','192.168.1.105:2222']}

这将在每个dask工作进程上创建一个tensorflow.train.Server,并设置 每个工作机上的数据传输队列。这些可以直接作为 ^工人的{tt2}$和tensorflow_queue属性。

更复杂的工作流程

通常,我们会设置长时间运行的dask任务来获取这些服务器和 参与一般的张量流计算。

fromdask.distributedimportworker_clientdefps_function(self):withworker_client()asc:tf_server=c.worker.tensorflow_servertf_server.join()ps_tasks=[client.submit(ps_function,workers=worker,pure=False)forworkerindask_spec['ps']]defworker_function(self):withworker_client()asc:tf_server=c.worker.tensorflow_server# ... use tensorflow as desired ...worker_tasks=[client.submit(worker_function,workers=worker,pure=False)forworkerindask_spec['worker']]

一种简单而灵活的方法是让这些功能在队列中阻塞。 向他们提供来自DASK阵列、数据帧等的数据。

defworker_function(self):withworker_client()asc:tf_server=c.worker.tensorflow_serverqueue=c.worker.tensorflow_queuewhilenotstopping_condition():batch=queue.get()# train with batch

然后将numpy和pandas数据帧块转储到这些队列中

fromdistributed.worker_clientimportget_workerdefdump_batch(batch):worker=get_worker()worker.tensorflow_queue.put(batch)importdask.dataframeasdddf=dd.read_csv('hdfs:///path/to/*.csv')# clean up dataframe as necessarypartitions=df.to_delayed()# delayed pandas dataframesclient.map(dump_batch,partitions)

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
用户界面java,使用gui连接到另一台计算机/服务器的文件系统   运行sbt的ubuntu返回错误:“javahome需要<path>参数”   java如何在Android中处理许多ImageView而不出现内存问题?   查询中非法字符的java相同URL失败   安卓取消引用可能会产生“java”。lang.NullPointerException'   java中的indexoutofboundsexception“java.lang.ArrayIndexOutOfBoundsException”错误   xml Java将dom保存到文件>文件在程序结束后由另一个进程打开   Java的垃圾收集器是如何工作的?   Java如何筛选值(列表)   java处理字符串我怎样才能像在真实的书籍中一样在上面部分生成“小数字”呢?   java SonarQube是否有一个API来获取所有项目分析的一部分?   java startActivity(intent)什么都不做   JAVAutil。扫描器类Java   java如何从Firebase更新电子邮件?UpdateMail方法已被弃用   java Hibernate。如何正确组织带有注释的onetomany关系?   在java中获得卷标和驱动器号之间的映射(而不是FileSystemView)的解决方法是什么   java查找文件的路径