Python: deferToThread XMLRPC 服务器 - Twisted - Cherrypy?
这个问题和我之前在这里问的几个问题有关,主要是关于如何在内存中对大量数据进行排序。
简单来说,我想要的/现在拥有的是:
一个正在运行的Twisted XMLRPC服务器。这个服务器在内存中保持着32个Foo类的实例。每个Foo类里面都有一个叫bar的列表(这个列表里会有几百万条记录)。有一个服务会从数据库中获取数据,然后把这些数据传递给XMLRPC服务器。数据基本上是一个字典,字典的键对应每个Foo实例,而值则是一个字典的列表,像这样:
data = {'foo1':[{'k1':'v1', 'k2':'v2'}, {'k1':'v1', 'k2':'v2'}], 'foo2':...}
然后,每个Foo实例会接收到与其键对应的值,并且Foo.bar字典会被更新和排序。
class XMLRPCController(xmlrpc.XMLRPC):
def __init__(self):
...
self.foos = {'foo1':Foo(), 'foo2':Foo(), 'foo3':Foo()}
...
def update(self, data):
for k, v in data:
threads.deferToThread(self.foos[k].processData, v)
def getData(self, fookey):
# return first 10 records of specified Foo.bar
return self.foos[fookey].bar[0:10]
class Foo():
def __init__(self):
bar = []
def processData(self, new_bar_data):
for record in new_bar_data:
# do processing, and add record, then sort
# BUNCH OF PROCESSING CODE
self.bar.sort(reverse=True)
问题是,当在XMLRPCController中调用更新函数时,如果记录数量很多(比如超过10万条),它会停止响应我的getData调用,直到所有32个Foo实例完成process_data方法。我原以为deferToThread可以解决这个问题,但我觉得我可能误解了问题所在。
有没有什么建议……我也愿意尝试其他的东西,比如Cherrypy,只要它支持我需要的功能。
编辑
@Troy:这是反应器的设置方式
reactor.listenTCP(port_no, server.Site(XMLRPCController)
reactor.run()
关于全局解释器锁(GIL),把sys.setcheckinterval()的值改得小一些,是否是个可行的选择,这样数据的锁就能释放,方便读取?
2 个回答
我不知道你的processData方法运行多久,也不知道你是怎么设置twisted的反应器的。默认情况下,twisted的反应器有一个线程池,线程数量在0到10之间。你可能试图把多达32个长时间运行的计算分配给最多10个线程,这样做效果不好。
你还需要考虑一下全局解释器锁(GIL)在更新这些集合时起了什么作用。
编辑:在你对程序做任何重大改动之前(比如调用sys.setcheckinterval()
),最好先用性能分析工具或者python的trace模块运行一下。这样可以告诉你哪些方法消耗了你所有的时间。没有正确的信息,你就无法做出正确的改动。
让应用程序变得更灵活的最简单方法就是把那些占用CPU资源的处理分成小块,然后在这些小块之间让反应器继续运行。比如,你可以使用reactor.callLater(0, process_next_chunk)来处理下一小块。这样实际上就是自己实现了一种合作式的多任务处理。
另一种方法是使用单独的进程来完成工作,这样你就可以利用多个核心的优势。可以看看Ampoule:https://launchpad.net/ampoule,它提供了一个类似于deferToThread的API。