解析XML文件时的nio错误
我在用Jython写一个函数,这个函数使用Popen来运行另一个程序,这个程序会把一个xml文件写到它的标准输出中,然后我把这个输出导向一个文件。当这个过程完成后,我关闭这个文件,然后调用另一个函数来解析这个文件。可是我在解析的时候收到了很多错误信息,提到访问已关闭的文件和/或格式不正确的xml文件(当我查看这些文件时,它们看起来没问题)。我想output.close()可能在关闭文件之前就返回了,所以我加了一个循环,等output.closed变成真。起初这似乎有效,但后来我的程序打印出了以下内容
blasting
blasted
parsing
parsed
Extending genes found via genemark, 10.00% done
blasting
blasted
parsing
Exception in thread "_CouplerThread-7 (stdout)" Traceback (most recent call last):
File "/Users/mbsulli/jython/Lib/subprocess.py", line 675, in run
self.write_func(buf)
IOError: java.nio.channels.AsynchronousCloseException
[Fatal Error] 17_2_corr.blastp.xml:15902:63: XML document structures must start and end within the same entity.
Retry
blasting
blasted
parsing
Exception in thread "_CouplerThread-9 (stdout)" Traceback (most recent call last):
File "/Users/mbsulli/jython/Lib/subprocess.py", line 675, in run
self.write_func(buf)
IOError: java.nio.channels.ClosedChannelException
[Fatal Error] 17_2_corr.blastp.xml:15890:30: XML document structures must start and end within the same entity.
Retry
blasting
我现在不太确定接下来该怎么办。我是不是想错了,认为在我解析之前xml还没有写入?如果是的话,我该怎么确保它已经写入了呢?
def parseBlast(fileName):
"""
A function for parsing XML blast output.
"""
print "parsing"
reader = XMLReaderFactory.createXMLReader()
reader.entityResolver = reader.contentHandler = BlastHandler()
reader.parse(fileName)
print "parsed"
return dict(map(lambda iteration: (iteration.query, iteration), reader.getContentHandler().iterations))
def cachedBlast(fileName, blastLocation, database, eValue, query, pipeline, remote = False, force = False):
"""
Performs a blast search using the blastp executable and database in blastLocation on
the query with the eValue. The result is an XML file saved to fileName. If fileName
already exists the search is skipped. If remote is true then the search is done remotely.
"""
if not os.path.isfile(fileName) or force:
output = open(fileName, "w")
command = [blastLocation + "/bin/blastp",
"-evalue", str(eValue),
"-outfmt", "5",
"-query", query]
if remote:
command += ["-remote",
"-db", database]
else:
command += ["-num_threads", str(Runtime.getRuntime().availableProcessors()),
"-db", database]
print "blasting"
blastProcess = subprocess.Popen(command,
stdout = output)
while blastProcess.poll() == None:
if pipeline.exception:
print "Stopping in blast"
blastProcess.kill()
output.close()
raise pipeline.exception
output.close()
while not output.closed:
pass
print "blasted"
try:
return parseBlast(fileName)
except SAXParseException:
print 'Retry'
return cachedBlast(fileName, blastLocation, database, eValue, query, pipeline, remote, True)
1 个回答
1
我觉得这个问题是我从用“等待”这个方法切换到用“轮询”这个方法的时候开始的。这样我就可以在进程运行的时候停止它。因为我已经有了很多数据集的结果,所以在我需要重新启动这个进程之前过了一段时间,这让我很难判断问题出在哪里。总之,我猜是因为我关闭的时候,输出还在写入中。我的解决办法是改用管道,然后自己写文件。
def cachedBlast(fileName, blastLocation, database, eValue, query, pipeline, remote = False, force = False):
"""
Performs a blast search using the blastp executable and database in blastLocation on
the query with the eValue. The result is an XML file saved to fileName. If fileName
already exists the search is skipped. If remote is true then the search is done remotely.
"""
if not os.path.isfile(fileName) or force:
output = open(fileName, "w")
command = [blastLocation + "/bin/blastp",
"-evalue", str(eValue),
"-outfmt", "5",
"-query", query]
if remote:
command += ["-remote",
"-db", database]
else:
command += ["-num_threads", str(Runtime.getRuntime().availableProcessors()),
"-db", database]
blastProcess = subprocess.Popen(command,
stdout = subprocess.PIPE)
while blastProcess.poll() == None:
output.write(blastProcess.stdout.read())
if pipeline.exception:
psProcess = subprocess.Popen(["ps", "aux"], stdout = subprocess.PIPE)
awkProcess = subprocess.Popen(["awk", "/" + " ".join(command).replace("/", "\\/") + "/"], stdin = psProcess.stdout, stdout = subprocess.PIPE)
for line in awkProcess.stdout:
subprocess.Popen(["kill", "-9", re.split(r"\s+", line)[1]])
output.close()
raise pipeline.exception
remaining = blastProcess.stdout.read()
while remaining:
output.write(remaining)
remaining = blastProcess.stdout.read()
output.close()
try:
return parseBlast(fileName)
except SAXParseException:
return cachedBlast(fileName, blastLocation, database, eValue, query, pipeline, remote, True)