解析XML文件时的nio错误

1 投票
1 回答
665 浏览
提问于 2025-04-16 17:37

我在用Jython写一个函数,这个函数使用Popen来运行另一个程序,这个程序会把一个xml文件写到它的标准输出中,然后我把这个输出导向一个文件。当这个过程完成后,我关闭这个文件,然后调用另一个函数来解析这个文件。可是我在解析的时候收到了很多错误信息,提到访问已关闭的文件和/或格式不正确的xml文件(当我查看这些文件时,它们看起来没问题)。我想output.close()可能在关闭文件之前就返回了,所以我加了一个循环,等output.closed变成真。起初这似乎有效,但后来我的程序打印出了以下内容

blasting  
blasted  
parsing  
parsed  
    Extending genes found via genemark, 10.00% done  
blasting  
blasted  
parsing  
Exception in thread "_CouplerThread-7 (stdout)" Traceback (most recent call last):  
  File "/Users/mbsulli/jython/Lib/subprocess.py", line 675, in run  
    self.write_func(buf)  
IOError: java.nio.channels.AsynchronousCloseException  
[Fatal Error] 17_2_corr.blastp.xml:15902:63: XML document structures must start and end within the same entity.  
Retry  
blasting  
blasted  
parsing  
Exception in thread "_CouplerThread-9 (stdout)" Traceback (most recent call last):  
  File "/Users/mbsulli/jython/Lib/subprocess.py", line 675, in run  
    self.write_func(buf)  
IOError: java.nio.channels.ClosedChannelException  
[Fatal Error] 17_2_corr.blastp.xml:15890:30: XML document structures must start and end within the same entity.  
Retry  
blasting  

我现在不太确定接下来该怎么办。我是不是想错了,认为在我解析之前xml还没有写入?如果是的话,我该怎么确保它已经写入了呢?

def parseBlast(fileName):
  """
  A function for parsing XML blast output.
  """
  print "parsing"
  reader = XMLReaderFactory.createXMLReader()
  reader.entityResolver = reader.contentHandler = BlastHandler()
  reader.parse(fileName)
  print "parsed"

  return dict(map(lambda iteration: (iteration.query, iteration), reader.getContentHandler().iterations))

def cachedBlast(fileName, blastLocation, database, eValue, query, pipeline, remote = False, force = False):
  """
  Performs a blast search using the blastp executable and database in blastLocation on
  the query with the eValue.  The result is an XML file saved to fileName.  If fileName
  already exists the search is skipped.  If remote is true then the search is done remotely.
  """
  if not os.path.isfile(fileName) or force:
    output = open(fileName, "w")
    command = [blastLocation + "/bin/blastp",
               "-evalue", str(eValue),
               "-outfmt", "5",
               "-query", query]
    if remote:
      command += ["-remote",
                  "-db", database]
    else:
      command += ["-num_threads", str(Runtime.getRuntime().availableProcessors()),
                  "-db", database]
    print "blasting"
    blastProcess = subprocess.Popen(command,
                                      stdout = output)
    while blastProcess.poll() == None:
      if pipeline.exception:
        print "Stopping in blast"
        blastProcess.kill()
        output.close()
        raise pipeline.exception
    output.close()
    while not output.closed:
      pass
    print "blasted"
  try:
    return parseBlast(fileName)
  except SAXParseException:
    print 'Retry'
    return cachedBlast(fileName, blastLocation, database, eValue, query, pipeline, remote, True)

1 个回答

1

我觉得这个问题是我从用“等待”这个方法切换到用“轮询”这个方法的时候开始的。这样我就可以在进程运行的时候停止它。因为我已经有了很多数据集的结果,所以在我需要重新启动这个进程之前过了一段时间,这让我很难判断问题出在哪里。总之,我猜是因为我关闭的时候,输出还在写入中。我的解决办法是改用管道,然后自己写文件。

def cachedBlast(fileName, blastLocation, database, eValue, query, pipeline, remote = False, force = False):


"""
Performs a blast search using the blastp executable and database in blastLocation on
the query with the eValue. The result is an XML file saved to fileName. If fileName
already exists the search is skipped. If remote is true then the search is done remotely.
"""
  if not os.path.isfile(fileName) or force:
    output = open(fileName, "w")
    command = [blastLocation + "/bin/blastp",
               "-evalue", str(eValue),
               "-outfmt", "5",
               "-query", query]
    if remote:
      command += ["-remote",
                  "-db", database]
    else:
      command += ["-num_threads", str(Runtime.getRuntime().availableProcessors()),
                  "-db", database]
    blastProcess = subprocess.Popen(command,
                                    stdout = subprocess.PIPE)
    while blastProcess.poll() == None:
      output.write(blastProcess.stdout.read())
      if pipeline.exception:
        psProcess = subprocess.Popen(["ps", "aux"], stdout = subprocess.PIPE)
        awkProcess = subprocess.Popen(["awk", "/" + " ".join(command).replace("/", "\\/") + "/"], stdin = psProcess.stdout, stdout = subprocess.PIPE)
        for line in awkProcess.stdout:
          subprocess.Popen(["kill", "-9", re.split(r"\s+", line)[1]])
        output.close()
        raise pipeline.exception
    remaining = blastProcess.stdout.read()
    while remaining:
      output.write(remaining)
      remaining = blastProcess.stdout.read()

    output.close()

  try:
    return parseBlast(fileName)
  except SAXParseException:
    return cachedBlast(fileName, blastLocation, database, eValue, query, pipeline, remote, True)

撰写回答