Python的multiprocessing与twisted的反应器

5 投票
2 回答
5623 浏览
提问于 2025-04-15 14:04

我正在开发一个xmlrpc服务器,它需要定期执行某些任务。我使用twisted作为xmlrpc服务的核心,但遇到了一些小问题:

class cemeteryRPC(xmlrpc.XMLRPC):

    def __init__(self, dic):
        xmlrpc.XMLRPC.__init__(self)


    def xmlrpc_foo(self):
        return 1


    def cycle(self):
        print "Hello"
        time.sleep(3)


class cemeteryM( base ):

    def __init__(self, dic):   # dic is for cemetery
        multiprocessing.Process.__init__(self)
        self.cemRPC = cemeteryRPC()


    def run(self):
        # Start reactor on a second process
        reactor.listenTCP( c.PORT_XMLRPC, server.Site( self.cemRPC ) )
        p = multiprocessing.Process( target=reactor.run )
        p.start()

        while not self.exit.is_set():
            self.cemRPC.cycle()
            #p.join()


if __name__ == "__main__":

    import errno
    test = cemeteryM()
    test.start()

    # trying new method
    notintr = False
    while not notintr:
        try:
            test.join()
            notintr = True 
        except OSError, ose:
            if ose.errno != errno.EINTR:
                raise ose
        except KeyboardInterrupt:
            notintr = True

我应该如何将这两个进程结合起来,以便它们各自的连接不会阻塞?

(我对“连接”这个词感到很困惑。为什么它会阻塞?我在网上搜索过,但找不到很多有用的解释。有人能给我解释一下吗?)

祝好

2 个回答

3

嘿,asdvawev,.join()在多进程中和在多线程中是一样的工作方式——它是一个阻塞调用,主线程会等待工作线程关闭。如果工作线程一直不关闭,那么.join()就永远不会返回。例如:

class myproc(Process):
    def run(self):
        while True:
            time.sleep(1)

在这个情况下调用run意味着join()永远不会返回。为了防止这种情况,我通常会使用一个Event()对象传递给子进程,这样我就可以给子进程发信号,让它知道什么时候退出:

class myproc(Process):
    def __init__(self, event):
        self.event = event
        Process.__init__(self)
    def run(self):
        while not self.event.is_set():
            time.sleep(1)

另外,如果你的工作是放在一个队列里的,你可以让子进程从队列中取任务,直到遇到一个特殊的标记(通常是队列中的None项),然后再关闭。

这两种方法的意思是,在调用.join()之前,你可以设置事件或者插入特殊标记,当调用join()时,进程会完成当前的任务,然后正常退出。

11

你真的需要把Twisted放在一个单独的进程中运行吗?我觉得这听起来有点不寻常。

试着把Twisted的反应器(Reactor)想象成你的主循环,然后把你需要的所有东西都挂在上面,而不是试图把Twisted当作一个后台任务来运行。

更常见的做法是使用Twisted的.callLater方法,或者在反应器中添加一个LoopingCall对象。

比如:

from twisted.web import xmlrpc, server
from twisted.internet import task
from twisted.internet import reactor

class Example(xmlrpc.XMLRPC):          
    def xmlrpc_add(self, a, b):
        return a + b

    def timer_event(self):
        print "one second"

r = Example()
m = task.LoopingCall(r.timer_event)
m.start(1.0)

reactor.listenTCP(7080, server.Site(r))
reactor.run()

撰写回答