SGE集群上Java最大堆大小

2 投票
1 回答
851 浏览
提问于 2025-04-18 08:46

我有一个用Python写的流程,它会调用一些Java的程序。这个流程可以在两种模式下运行,一种是本地模式(在单个节点上),另一种是SGE集群模式。

当我选择集群模式时,日志里会出现这样的错误信息:

Invalid maximum heap size: -Xmx4g -jar
Error: Could not create the Java Virtual Machine.
Error: A fatal exception has occurred. Program will exit.

在本地模式下,没有任何错误,也没有问题。

我想知道,是什么原因导致了这样的错误呢?

我用来在本地或集群上运行任务的类如下:

class LocalJobManager(JobManager):
    def __init__(self):
        self.cmd_strs = []

    def add_job(self, cmd, cmd_args, **kwargs):
        cmd_str = ' '.join([cmd, ] + [str(x) for x in cmd_args])

        self.cmd_strs.append(cmd_str)

    def run_job(self, cmd, cmd_args, **kwargs):
        cmd_str = ' '.join([cmd, ] + [str(x) for x in cmd_args])

        self._run_cmd(cmd_str)

    def wait(self):
        for cmd_str in self.cmd_strs:
            self._run_cmd(cmd_str)

    def _run_cmd(self, cmd_str):
        '''
        Throw exception if run command fails
        '''
        process = subprocess.Popen(cmd_str, stdin=subprocess.PIPE, shell=True)

        process.stdin.close()

        sts = os.waitpid(process.pid, 0)

        if sts[1] != 0:
            raise Exception('Failed to run {0}\n'.format(cmd_str))

class ClusterJobManager(JobManager):
    def __init__(self, log_dir=None):
        import drmaa

        self._drmaa = drmaa

        self.log_dir = log_dir

        if self.log_dir is not None:
            make_directory(self.log_dir)

        self.session = self._drmaa.Session()

        self.session.initialize()

        self.job_ids = Queue()

        self._lock = threading.Lock()

    def add_job(self, cmd, cmd_args, mem=4, max_mem=10, num_cpus=1):
        job_id = self._run_job(cmd, cmd_args, mem, max_mem, num_cpus)

        self.job_ids.put(job_id)

    def run_job(self, cmd, cmd_args, mem=4, max_mem=10, num_cpus=1):
        job_id = self._run_job(cmd, cmd_args, mem, max_mem, num_cpus)

        self._check_exit_status(job_id)

    def wait(self):
        self._lock.acquire()

        job_ids = []

        while not self.job_ids.empty():
            job_ids.append(self.job_ids.get())

        self.session.synchronize(job_ids, self._drmaa.Session.TIMEOUT_WAIT_FOREVER, False)

        self._lock.release()

        for job_id in job_ids:
            self._check_exit_status(job_id)

    def close(self):
        self.session.control(self._drmaa.Session.JOB_IDS_SESSION_ALL, self._drmaa.JobControlAction.TERMINATE)

        self.session.exit()

    def _run_job(self, cmd, cmd_args, mem, max_mem, num_cpus):
        job_template = self._init_job_template(cmd, cmd_args, mem, max_mem, num_cpus)

        job_id = self.session.runJob(job_template)

        self.session.deleteJobTemplate(job_template)

        return job_id

    def _init_job_template(self, cmd, cmd_args, mem, max_mem, num_cpus):
        job_template = self.session.createJobTemplate()

        job_template.remoteCommand = cmd

        job_template.args = [str(x) for x in cmd_args]

        job_template.workingDirectory = os.getcwd()

        if self.log_dir is not None:
            job_template.errorPath = ':' + self.log_dir

            job_template.outputPath = ':' + self.log_dir

        job_template.nativeSpecification = '-l mem_free={mem}G,mem_token={mem}G,h_vmem={max_mem}G -V -w n -pe ncpus {num_cpus}'.format(**locals())

        return job_template

    def _check_exit_status(self, job_id):
        return_value = self.session.wait(job_id, self._drmaa.Session.TIMEOUT_WAIT_FOREVER)

        if return_value.exitStatus != 0:
            raise Exception('Job {0} failed with exit status {1}.'.format(return_value.jobId,
                                                                          return_value.exitStatus))

通常,无法创建Java虚拟机(我在一些论坛上看到的)是由于语法错误造成的,尽管调用的命令是正确的,并且在本地可以正常工作。而且,上面提到的在集群上运行任务的类,除了Java之外,其他都能正常运行。

谢谢。

1 个回答

4

我在SGE上遇到了这个问题。你可能有一个默认的硬性内存限制,大约是4GB,而Java在初始化时似乎使用的内存超过了你在-Xmx4g参数中设置的4GB。你可以看看你的管理员是否设置了硬性内存限制吗?通常,你可以通过以下方式来设置或覆盖默认限制:

qsub -l h_vmem=16G

试着通过这个参数给出比实际需要的内存多得多的值,看看这样是否能解决问题,然后再逐渐降低h_vmem的值,直到接近崩溃的边缘。

撰写回答