如何在Python中异步处理XML?

7 投票
4 回答
3965 浏览
提问于 2025-04-15 18:10

我有一个很大的XML数据文件(超过160M)需要处理,听说用SAX/expat/pulldom解析比较合适。我想开一个线程来遍历这些节点,把需要处理的节点放到一个队列里,然后其他工作线程再从队列中取出下一个可用的节点进行处理。

我现在有以下代码(我知道应该加锁,后面会加)

import sys, time
import xml.parsers.expat
import threading

q = []

def start_handler(name, attrs):
    q.append(name)

def do_expat():
    p = xml.parsers.expat.ParserCreate()
    p.StartElementHandler = start_handler
    p.buffer_text = True
    print("opening {0}".format(sys.argv[1]))
    with open(sys.argv[1]) as f:
        print("file is open")
        p.ParseFile(f)
        print("parsing complete")


t = threading.Thread(group=None, target=do_expat)
t.start()

while True:
    print(q)
    time.sleep(1)

问题是,while循环里的代码只执行了一次,然后我连按ctrl-C都无法中断它。在处理较小的文件时,输出是正常的,但这似乎说明处理程序只有在文档完全解析后才会被调用,这样就失去了SAX解析器的意义。

我知道这可能是我自己的无知,但我看不出我哪里出错了。

另外,我也尝试过这样修改start_handler

def start_handler(name, attrs):
    def app():
        q.append(name)
    u = threading.Thread(group=None, target=app)
    u.start()

但也没有效果。

4 个回答

1

我看到的唯一问题是,你在不同的线程中同时访问了 q,而没有进行锁定。这可真是个麻烦事——你可能会遇到 Python 解释器卡住的问题。:)

试试加锁,其实并不难:

import sys, time
import xml.parsers.expat
import threading

q = []
q_lock = threading.Lock() <---

def start_handler(name, attrs):
    q_lock.acquire() <---
    q.append(name)
    q_lock.release() <---

def do_expat():
    p = xml.parsers.expat.ParserCreate()
    p.StartElementHandler = start_handler
    p.buffer_text = True
    print("opening {0}".format(sys.argv[1]))
    with open(sys.argv[1]) as f:
        print("file is open")
        p.ParseFile(f)
        print("parsing complete")


t = threading.Thread(group=None, target=do_expat)
t.start()

while True:
    q_lock.acquire() <---
    print(q)
    q_lock.release() <---
    time.sleep(1)

你看,其实很简单,我们只是创建了一个锁变量来保护我们的对象,每次使用对象之前先获取这个锁,完成任务后再释放这个锁。这样就能确保 q.append(name)print(q) 不会同时进行。


(在更新版本的 Python 中,还有一种 "with .... " 的写法,可以帮助你自动释放锁、关闭文件或进行其他常常被遗忘的清理工作。)

8

我对这个问题不是很确定。我猜调用ParseFile这个函数是阻塞的,只有解析的线程在运行,因为有个叫做GIL的东西。解决这个问题的一种方法是使用multiprocessing模块。这个模块本来就是为了处理队列而设计的。

你可以创建一个Process,并且可以给它传递一个Queue

import sys, time
import xml.parsers.expat
import multiprocessing
import Queue

def do_expat(q):
    p = xml.parsers.expat.ParserCreate()

    def start_handler(name, attrs):
        q.put(name)

    p.StartElementHandler = start_handler
    p.buffer_text = True
    print("opening {0}".format(sys.argv[1]))
    with open(sys.argv[1]) as f:
        print("file is open")
        p.ParseFile(f)
        print("parsing complete")

if __name__ == '__main__':
    q = multiprocessing.Queue()
    process = multiprocessing.Process(target=do_expat, args=(q,))
    process.start()

    elements = []
    while True:
        while True:
            try:
                elements.append(q.get_nowait())
            except Queue.Empty:
                break

        print elements
        time.sleep(1)

我加了一个元素列表,目的是为了复制你原来的脚本。你最终的解决方案可能会用到get_nowait和一个Pool或者类似的东西。

8

你提到的 ParseFile 方法,会一次性把所有内容都处理掉,这样不适合你想要的 增量 解析!所以,建议你把文件分成小块,逐步喂给解析器,同时确保在处理的过程中可以把控制权交给其他线程,比如:

while True:
  data = f.read(BUFSIZE)
  if not data:
    p.Parse('', True)
    break
  p.Parse(data, False)
  time.sleep(0.0)

这里的 time.sleep(0.0) 是 Python 的一种方式,表示“如果有其他线程准备好了,就把控制权让给它们”;关于 Parse 方法的详细信息可以在 这里 找到。

第二点是,别担心锁的问题!使用 Queue.Queue 就可以了,它本身就是线程安全的,几乎总是协调多个线程的最佳和最简单的方法。你只需要创建一个 Queue 实例 q,然后用 q.put(name) 把任务放进去,工作线程就可以在 q.get() 上等待,等着获取更多的工作,这样做超级简单!

(还有一些辅助策略可以用来协调工作线程的结束,比如当没有更多工作时,但最简单的方法是把它们设置为守护线程,这样它们就会在主线程结束时自动终止——具体可以参考 文档)。

撰写回答