我有一个相当复杂的基于socket(ZeroMQ-REQ/REP
)的python程序,我想通过在同一台机器上运行一个简单的socket脚本来验证它是否正常工作。在
测试脚本就是这样的。在
import subprocess
import zmq
import json
# ...
for call, response in zip(test_calls, expected_responses):
p = subprocess.Popen(['python', 'main.py'], stdout=subprocess.PIPE)
context = zmq.Context()
socket = context.socket(zmq.REQ)
socket.setsockopt(zmq.RCVTIMEO, 1000)
socket.connect("tcp://localhost:8084")
socket.send_string(json.dumps(call))
r = json.loads(socket.recv_string())
assert r == response
p.terminate()
socket.close()
(可能值得注意的是,它实际上是在noes2中使用这样的测试来实现的,但是我觉得这超出了这个问题的范围,而且会使示例复杂化。这在很大程度上总结了测试中所发生的事情)。在
85%的时间,这会奏效,一切都会过去的。哇哦!另外15%的时间,我在r = json.loads(socket.recv_string())
行得到一个zmq.error.Again: Resource temporarily unavailable
(如果我没有设置zmq.RCVTIMEO
,它就会挂起)。在
我想知道这是否是一个计时过程(子进程不能及时启动/停止),我在这个地方打了几个time.sleep()
调用,但它似乎什么也没做。在
{{cd6>print
语句,它们通过套接字打印到stdout
上的每个调用和响应,但它没有接收任何输入,因此{
我以前从未在zmq中遇到过这种问题,所以我认为这可能与子进程的使用有关。有没有人知道可能是什么问题以及如何解决它?在
谢谢。在
更新:所以看起来进程没有终止(尽管在主应用程序中使用了signal.signal(signal.SIGTERM, close_app)
信号)。这是否会导致有关通过zmq与之通信的活动进程的混乱?最初调用p.kill()
而不是p.terminate()
似乎起到了作用,尽管仍然以同样的方式失败了一两次。在
更新2:似乎正在工作的东西正在直接调用命令kill
:
在大多数情况下,它似乎优雅地关闭了它。在
有什么问题吗
可能与所述子进程中未发布的代码有关,这会导致在子进程强制终止期间启动的观察到的行为(包括它的其他资源,在一个智能且功能非常丰富的多线程
zmq.Context( n_IO_threads = 1 )
实例中管理,看不见&;在有限的先验编码/执行控制中)。在考虑一下
{SIGTERM|SIGKILL|...}
而不是紧急刹车,紧急按钮,
在分布式系统设计中不是一个明智的解决方案
一旦进入分布式系统设计,人们应该忘记使用类似于
SIGTERM
等的无上下文工具,但最好将自己的软信号控制平面纳入新设计的分布式系统基础设施中。在这有助于“远程”代理根据此类软信号的实际上下文进行操作,并允许(在您的完全算法控制下)执行所有必要的安全保护、资源清理和终止前的职责,以便最终优雅地清理退出。在
在这一点上,我可能听起来有些过时,但在代码最终指示所有
zmq.Context()
实例到.term()
之前,总是将套接字显式地指示为.term()
。据报道,这是不必要的,但在分布式系统的设计/实现中,做到资源处理的干净和公平是一项公平的职责。在没有例外,没有借口。在
该死的忘了0在
ZMQ_LINGER
一个值得一提的例子是
ZeroMQ API
参数的默认值,如果没有设置,则默认值为0
,这意味着一旦这样的ZeroMQ
-socket实例(显式或隐式)被指示到.close()
,并且还有一个ZMQ_LINGER == 0
,套接字端点将BLOCK
直到来自交易对手缓冲区的所有消息被传递,这可能导致您的分布式处理挂起,而没有任何机会在事后解决此类死锁,如果没有正确地预先设置,则不会永远等待挂起的消息。在一个更新的
pyzmq
文档明确警告不要.destroy()
一个zmq.Context
实例(并且盲目地让一个权威发布的.destroy()
-d获取套接字.close()
-d,这是一个排除了自己代码控件的权限)因此,甚至还有一些理由不依赖
SIGTERM
魔鬼的服务。在正在使用的端口
另外,释放占用的传输类资源需要一些时间。因此,拥有一个刚刚发布了一个
IP:port
的代码并不意味着另一个实例/进程/线程可以直接跳入并捕捉同一个端口,而不存在与O/S相关的延迟。相反,在这方面也要检查一下你的资源重用/释放策略(我敢冒这个区域阻塞的风险,用一些端口地址池来轮换和排队,这样至少可以推迟任何潜在的重用情况,在一个合理的O/S相关的延迟完全过期之前,IMHO阻止阻塞状态比事后处理阻塞状态上的异常要好得多)。在.bind()
之前.connect()
是另一个这样的问题。一旦您的
subprocess.Popen(...)
启动,在O/S服务启动并使子进程开始独立呼吸之前需要一段时间。在如果已经处于活动状态并正在执行的第一个进程已到达
.connect()
,则在派生的子进程实例到达.bind()
之前,分布式系统将阻塞。在安装/拆卸往返时间不可减少到零。资源不是一次性的。有一些与系统相关的维护和共享开销与它们的使用有关。在
最后
.recv_string()
可能并确实引发了ZMQError EAGAIN
在某些情况下,还没有任何消息r已经在本地节点中通过任何
.recv*()
方法获取它,无论它是flags = zmq.NOBLOCK
模式下的{.recv|.recv_string|.recv_json|&al}
。在相关问题 更多 >
编程相关推荐