如何在Python中异步处理XML?
我有一个很大的XML数据文件(超过160M)需要处理,听说用SAX/expat/pulldom解析比较合适。我想开一个线程来遍历这些节点,把需要处理的节点放到一个队列里,然后其他工作线程再从队列中取出下一个可用的节点进行处理。
我现在有以下代码(我知道应该加锁,后面会加)
import sys, time
import xml.parsers.expat
import threading
q = []
def start_handler(name, attrs):
q.append(name)
def do_expat():
p = xml.parsers.expat.ParserCreate()
p.StartElementHandler = start_handler
p.buffer_text = True
print("opening {0}".format(sys.argv[1]))
with open(sys.argv[1]) as f:
print("file is open")
p.ParseFile(f)
print("parsing complete")
t = threading.Thread(group=None, target=do_expat)
t.start()
while True:
print(q)
time.sleep(1)
问题是,while
循环里的代码只执行了一次,然后我连按ctrl-C都无法中断它。在处理较小的文件时,输出是正常的,但这似乎说明处理程序只有在文档完全解析后才会被调用,这样就失去了SAX解析器的意义。
我知道这可能是我自己的无知,但我看不出我哪里出错了。
另外,我也尝试过这样修改start_handler
:
def start_handler(name, attrs):
def app():
q.append(name)
u = threading.Thread(group=None, target=app)
u.start()
但也没有效果。
4 个回答
我看到的唯一问题是,你在不同的线程中同时访问了 q
,而没有进行锁定。这可真是个麻烦事——你可能会遇到 Python 解释器卡住的问题。:)
试试加锁,其实并不难:
import sys, time
import xml.parsers.expat
import threading
q = []
q_lock = threading.Lock() <---
def start_handler(name, attrs):
q_lock.acquire() <---
q.append(name)
q_lock.release() <---
def do_expat():
p = xml.parsers.expat.ParserCreate()
p.StartElementHandler = start_handler
p.buffer_text = True
print("opening {0}".format(sys.argv[1]))
with open(sys.argv[1]) as f:
print("file is open")
p.ParseFile(f)
print("parsing complete")
t = threading.Thread(group=None, target=do_expat)
t.start()
while True:
q_lock.acquire() <---
print(q)
q_lock.release() <---
time.sleep(1)
你看,其实很简单,我们只是创建了一个锁变量来保护我们的对象,每次使用对象之前先获取这个锁,完成任务后再释放这个锁。这样就能确保 q.append(name)
和 print(q)
不会同时进行。
(在更新版本的 Python 中,还有一种 "with .... " 的写法,可以帮助你自动释放锁、关闭文件或进行其他常常被遗忘的清理工作。)
我对这个问题不是很确定。我猜调用ParseFile这个函数是阻塞的,只有解析的线程在运行,因为有个叫做GIL的东西。解决这个问题的一种方法是使用multiprocessing
模块。这个模块本来就是为了处理队列而设计的。
你可以创建一个Process
,并且可以给它传递一个Queue
:
import sys, time
import xml.parsers.expat
import multiprocessing
import Queue
def do_expat(q):
p = xml.parsers.expat.ParserCreate()
def start_handler(name, attrs):
q.put(name)
p.StartElementHandler = start_handler
p.buffer_text = True
print("opening {0}".format(sys.argv[1]))
with open(sys.argv[1]) as f:
print("file is open")
p.ParseFile(f)
print("parsing complete")
if __name__ == '__main__':
q = multiprocessing.Queue()
process = multiprocessing.Process(target=do_expat, args=(q,))
process.start()
elements = []
while True:
while True:
try:
elements.append(q.get_nowait())
except Queue.Empty:
break
print elements
time.sleep(1)
我加了一个元素列表,目的是为了复制你原来的脚本。你最终的解决方案可能会用到get_nowait
和一个Pool
或者类似的东西。
你提到的 ParseFile
方法,会一次性把所有内容都处理掉,这样不适合你想要的 增量 解析!所以,建议你把文件分成小块,逐步喂给解析器,同时确保在处理的过程中可以把控制权交给其他线程,比如:
while True:
data = f.read(BUFSIZE)
if not data:
p.Parse('', True)
break
p.Parse(data, False)
time.sleep(0.0)
这里的 time.sleep(0.0)
是 Python 的一种方式,表示“如果有其他线程准备好了,就把控制权让给它们”;关于 Parse
方法的详细信息可以在 这里 找到。
第二点是,别担心锁的问题!使用 Queue.Queue 就可以了,它本身就是线程安全的,几乎总是协调多个线程的最佳和最简单的方法。你只需要创建一个 Queue
实例 q
,然后用 q.put(name)
把任务放进去,工作线程就可以在 q.get()
上等待,等着获取更多的工作,这样做超级简单!
(还有一些辅助策略可以用来协调工作线程的结束,比如当没有更多工作时,但最简单的方法是把它们设置为守护线程,这样它们就会在主线程结束时自动终止——具体可以参考 文档)。