使用多进程工作者的Twisted网络客户端?
我有一个应用程序,它使用 Twisted 和 Stomper 作为 STOMP 客户端,并把工作分配给一个 multiprocessing.Pool 的工作者。
当我用一个 Python 脚本启动这个程序时,它似乎运行得还不错,简化后的代码大概是这样的:
# stompclient.py
logging.config.fileConfig(config_path)
logger = logging.getLogger(__name__)
# Add observer to make Twisted log via python
twisted.python.log.PythonLoggingObserver().start()
# initialize the process pool. (child processes get forked off immediately)
pool = multiprocessing.Pool(processes=processes)
StompClientFactory.username = username
StompClientFactory.password = password
StompClientFactory.destination = destination
reactor.connectTCP(host, port, StompClientFactory())
reactor.run()
在准备部署的时候,我想利用 twistd 脚本,并通过一个 tac 文件来运行这个程序。
这是我看起来非常相似的 tac 文件:
# stompclient.tac
logging.config.fileConfig(config_path)
logger = logging.getLogger(__name__)
# Add observer to make Twisted log via python
twisted.python.log.PythonLoggingObserver().start()
# initialize the process pool. (child processes get forked off immediately)
pool = multiprocessing.Pool(processes=processes)
StompClientFactory.username = username
StompClientFactory.password = password
StompClientFactory.destination = destination
application = service.Application('myapp')
service = internet.TCPClient(host, port, StompClientFactory())
service.setServiceParent(application)
为了说明问题,我简化或修改了一些细节;希望这些不是问题的关键。例如,我的应用有一个插件系统,池是通过一个单独的方法初始化的,然后工作是通过 pool.apply_async() 方法分配给池的,传入的是我插件的 process() 方法。
所以,如果我运行这个脚本(stompclient.py),一切都按预期工作。
如果我以非守护进程模式(-n)运行 twist,似乎也能正常工作:
twistd -noy stompclient.tac
但是,当我以守护进程模式运行时,它就不行了:
twistd -oy stompclient.tac
应用程序看起来正常启动,但当它尝试分配工作时,就会卡住。这里的“卡住”是指子进程似乎从未被要求去做任何事情,而父进程(调用 pool.apply_async() 的那个)就在那里等着响应返回。
我知道我在使用 Twisted 和 multiprocessing 时可能做了什么傻事,但我真的希望有人能告诉我我的方法有什么问题。
提前谢谢大家!
2 个回答
给你一个可能的想法...
当在守护进程模式下运行时,twistd会关闭标准输入、标准输出和标准错误输出。你的客户端有没有什么操作是需要读或写这些的?
你提到的工作正常的调用和不正常的调用之间,唯一的区别就是“-n”这个选项,所以问题很可能是因为“守护进程化”这个过程造成的,而“-n”选项正是为了防止这个过程发生。
在POSIX系统中,守护进程化的一个步骤是“分叉”,也就是创建一个新的进程,然后让原来的进程退出。这样做的结果是,你的代码会在一个和.tac文件被评估时不同的进程中运行。同时,这也会改变在.tac文件中启动的进程的父子关系,比如你的多进程池中的进程。
多进程池的进程一开始是以你启动的twistd进程为父进程的。但是,当这个进程在守护进程化的过程中退出后,它们的父进程就变成了系统的初始化进程。这可能会引发一些问题,虽然可能不是你描述的那种挂起的问题。还有其他一些底层的实现细节,通常可以让多进程模块正常工作,但在守护进程化过程中会被打乱。
幸运的是,避免这种奇怪的相互影响应该很简单。Twisted的服务API允许你在守护进程化完成后再运行代码。如果你使用这些API,就可以把多进程模块的进程池初始化延迟到守护进程化之后,这样就有希望避免问题。下面是一个可能的示例:
from twisted.application.service import Service
class MultiprocessingService(Service):
def startService(self):
self.pool = multiprocessing.Pool(processes=processes)
MultiprocessingService().setServiceParent(application)
另外,你可能还会遇到与多进程模块的子进程清理相关的问题,或者可能与使用Twisted的进程创建API(reactor.spawnProcess)创建的进程有关。这是因为正确处理子进程通常涉及到处理SIGCHLD信号。不过,Twisted和多进程模块在这方面不会合作,所以其中一个会收到所有子进程退出的通知,而另一个则不会收到。如果你根本不使用Twisted的API来创建子进程,那么这可能没问题——但你可能需要检查一下多进程模块尝试安装的任何信号处理程序是否真的“胜出”,而不是被Twisted自己的处理程序替换掉。