Python: deferToThread XMLRPC 服务器 - Twisted - Cherrypy?

-1 投票
2 回答
1510 浏览
提问于 2025-04-15 18:54

这个问题和我之前在这里问的几个问题有关,主要是关于如何在内存中对大量数据进行排序。

简单来说,我想要的/现在拥有的是:

一个正在运行的Twisted XMLRPC服务器。这个服务器在内存中保持着32个Foo类的实例。每个Foo类里面都有一个叫bar的列表(这个列表里会有几百万条记录)。有一个服务会从数据库中获取数据,然后把这些数据传递给XMLRPC服务器。数据基本上是一个字典,字典的键对应每个Foo实例,而值则是一个字典的列表,像这样:

data = {'foo1':[{'k1':'v1', 'k2':'v2'}, {'k1':'v1', 'k2':'v2'}], 'foo2':...}

然后,每个Foo实例会接收到与其键对应的值,并且Foo.bar字典会被更新和排序。

class XMLRPCController(xmlrpc.XMLRPC):

    def __init__(self):
        ...
        self.foos = {'foo1':Foo(), 'foo2':Foo(), 'foo3':Foo()}
        ...

    def update(self, data):
        for k, v in data:
            threads.deferToThread(self.foos[k].processData, v)

    def getData(self, fookey):
        # return first 10 records of specified Foo.bar
        return self.foos[fookey].bar[0:10]

class Foo():

    def __init__(self):
        bar = []

    def processData(self, new_bar_data):
        for record in new_bar_data:
            # do processing, and add record, then sort
            # BUNCH OF PROCESSING CODE
            self.bar.sort(reverse=True)

问题是,当在XMLRPCController中调用更新函数时,如果记录数量很多(比如超过10万条),它会停止响应我的getData调用,直到所有32个Foo实例完成process_data方法。我原以为deferToThread可以解决这个问题,但我觉得我可能误解了问题所在。

有没有什么建议……我也愿意尝试其他的东西,比如Cherrypy,只要它支持我需要的功能。


编辑

@Troy:这是反应器的设置方式

reactor.listenTCP(port_no, server.Site(XMLRPCController)
reactor.run()

关于全局解释器锁(GIL),把sys.setcheckinterval()的值改得小一些,是否是个可行的选择,这样数据的锁就能释放,方便读取?

2 个回答

0

我不知道你的processData方法运行多久,也不知道你是怎么设置twisted的反应器的。默认情况下,twisted的反应器有一个线程池,线程数量在0到10之间。你可能试图把多达32个长时间运行的计算分配给最多10个线程,这样做效果不好。

你还需要考虑一下全局解释器锁(GIL)在更新这些集合时起了什么作用。

编辑:在你对程序做任何重大改动之前(比如调用sys.setcheckinterval()),最好先用性能分析工具或者python的trace模块运行一下。这样可以告诉你哪些方法消耗了你所有的时间。没有正确的信息,你就无法做出正确的改动。

1

让应用程序变得更灵活的最简单方法就是把那些占用CPU资源的处理分成小块,然后在这些小块之间让反应器继续运行。比如,你可以使用reactor.callLater(0, process_next_chunk)来处理下一小块。这样实际上就是自己实现了一种合作式的多任务处理。

另一种方法是使用单独的进程来完成工作,这样你就可以利用多个核心的优势。可以看看Ampoule:https://launchpad.net/ampoule,它提供了一个类似于deferToThread的API。

撰写回答