如何从python启动并动态连接到h2o集群实例?

2024-04-28 07:39:11 发布

您现在位置:Python中文网/ 问答频道 /正文

根据问题和后续答案here:当启动运行在hadoop集群上的h2o实例时(比如hadoop jar h2odriver.jar -nodes 4 -mapperXmx 6g -output hdfsOutputDir),用于连接到h2o实例的回调IP地址由hadoop运行时选择。所以在大多数情况下,IP地址和端口是由Hadoop运行时选择的,以找到最佳可用的

....
H2O node 172.18.4.63:54321 reports H2O cluster size 4
H2O node 172.18.4.67:54321 reports H2O cluster size 4
H2O cluster (4 nodes) is up
(Note: Use the -disown option to exit the driver after cluster formation)
Open H2O Flow in your web browser: http://172.18.4.67:54321
Connection url output line: Open H2O Flow in your web browser: http://172.18.4.67:54321

建议使用h2o的方法是每次您想使用它时启动和停止单个实例(抱歉,当前找不到支持文档)。这里的问题是,如果您希望python代码启动并自动连接到h2o实例,那么在h2o实例已经启动并运行之前,它不知道要连接到哪个IP。因此,在Hadoop上启动H2O集群的一个常见方法是让Hadoop决定集群,然后解析该行的输出

^{pr2}$

获取/提取IP地址。在

这里的问题是,h2o是一个阻塞进程,它的输出在实例启动时打印为文本行的,这使得我很难使用基本的pythonPopen逻辑获取最终的输出行。有没有一种方法可以在生成输出时捕获输出,从而获得具有连接IP的线路?在


Tags: the实例方法hadoopnodeoutputsize集群
1条回答
网友
1楼 · 发布于 2024-04-28 07:39:11

{I从一个线程开始搜索},然后我们使用一个线程}从一个线程返回到另一个线程},然后使用这个线程}返回一个进程。参见下面的示例。在

# startup hadoop h2o cluster
import shlex
import re

from Queue import Queue, Empty
from threading import Thread

def enqueue_output(out, queue):
    """
    Function for communicating streaming text lines from seperate thread.
    see https://stackoverflow.com/questions/375427/non-blocking-read-on-a-subprocess-pipe-in-python
    """
    for line in iter(out.readline, b''):
        queue.put(line)
    out.close()

# series of commands to run in-order for for bringing up the h2o cluster on demand
startup_cmds = [
    # remove any existing tmp log dir. for h2o processes
    'rm -r /some/location/for/h2odriver.jar/output',
    # start h2o on cluster
    '/bin/hadoop jar {}h2odriver.jar -nodes 4 -mapperXmx 6g -output hdfsOutputDir'.format("/local/h2o/start/path")
]

# clear legacy temp. dir.
if os.path.isdir(/some/location/for/h2odriver.jar/output):
    print subprocess.check_output(shlex.split(startup_cmds[0]))

# start h2o service in background thread
startup_p = subprocess.Popen(shlex.split(startup_cmds[1]), 
                             shell=False, 
                             stdout=subprocess.PIPE, stderr=subprocess.PIPE)

# setup message passing queue
q = Queue()
t = Thread(target=enqueue_output, args=(startup_p.stdout, q))
t.daemon = True # thread dies with the program
t.start()

# read line without blocking
h2o_url_out = ''
while True:
    try:  line = q.get_nowait() # or q.get(timeout=.1)
    except Empty:
        continue
    else: # got line
        print line
        # check for first instance connection url output
        if re.search("Open H2O Flow in your web browser", line) is not None:
            h2o_url_out = line
            break
        if re.search('Error', line) is not None:
            print 'Error generated: %s' % line
            sys.exit()

# capture connection IP from h2o process output
print 'Connection url output line: %s' % h2o_url_out
h2o_cnxn_ip = re.search("(?<=Open H2O Flow in your web browser: http:\/\/)(.*?)(?=:)", h2o_url_out).group(1)
print 'H2O connection ip: %s' % h2o_cnxn_ip

相关问题 更多 >