zmq连接到用子进程打开的脚本

2024-04-26 13:51:32 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个相当复杂的基于socket(ZeroMQ-REQ/REP)的python程序,我想通过在同一台机器上运行一个简单的socket脚本来验证它是否正常工作。在

测试脚本就是这样的。在

import subprocess
import zmq
import json

# ...

for call, response in zip(test_calls, expected_responses):
    p = subprocess.Popen(['python', 'main.py'], stdout=subprocess.PIPE)

    context = zmq.Context()
    socket = context.socket(zmq.REQ)
    socket.setsockopt(zmq.RCVTIMEO, 1000)
    socket.connect("tcp://localhost:8084")

    socket.send_string(json.dumps(call))
    r = json.loads(socket.recv_string())

    assert r == response

    p.terminate()
    socket.close()

(可能值得注意的是,它实际上是在noes2中使用这样的测试来实现的,但是我觉得这超出了这个问题的范围,而且会使示例复杂化。这在很大程度上总结了测试中所发生的事情)。在

85%的时间,这会奏效,一切都会过去的。哇哦!另外15%的时间,我在r = json.loads(socket.recv_string())行得到一个zmq.error.Again: Resource temporarily unavailable(如果我没有设置zmq.RCVTIMEO,它就会挂起)。在

我想知道这是否是一个计时过程(子进程不能及时启动/停止),我在这个地方打了几个time.sleep()调用,但它似乎什么也没做。在

{{cd6>检查子进程的{d},并将其放在cd6}的子进程之后。我在应用程序中有几个print语句,它们通过套接字打印到stdout上的每个调用和响应,但它没有接收任何输入,因此{}当然会超时。在

我以前从未在zmq中遇到过这种问题,所以我认为这可能与子进程的使用有关。有没有人知道可能是什么问题以及如何解决它?在

谢谢。在


更新:所以看起来进程没有终止(尽管在主应用程序中使用了signal.signal(signal.SIGTERM, close_app)信号)。这是否会导致有关通过zmq与之通信的活动进程的混乱?最初调用p.kill()而不是p.terminate()似乎起到了作用,尽管仍然以同样的方式失败了一两次。在


更新2:似乎正在工作的东西正在直接调用命令kill

^{pr2}$

在大多数情况下,它似乎优雅地关闭了它。在


Tags: import脚本jsonstringsignal进程responsestdout
1条回答
网友
1楼 · 发布于 2024-04-26 13:51:32

有什么问题吗

可能与所述子进程中未发布的代码有关,这会导致在子进程强制终止期间启动的观察到的行为(包括它的其他资源,在一个智能且功能非常丰富的多线程zmq.Context( n_IO_threads = 1 )实例中管理,看不见&;在有限的先验编码/执行控制中)。在

考虑一下{SIGTERM|SIGKILL|...}而不是紧急刹车,
紧急按钮,
在分布式系统设计中不是一个明智的解决方案

一旦进入分布式系统设计,人们应该忘记使用类似于SIGTERM等的无上下文工具,但最好将自己的软信号控制平面纳入新设计的分布式系统基础设施中。在

这有助于“远程”代理根据此类软信号的实际上下文进行操作,并允许(在您的完全算法控制下)执行所有必要的安全保护、资源清理和终止前的职责,以便最终优雅地清理退出。在

在这一点上,我可能听起来有些过时,但在代码最终指示所有zmq.Context()实例到.term()之前,总是将套接字显式地指示为.term()。据报道,这是不必要的,但在分布式系统的设计/实现中,做到资源处理的干净和公平是一项公平的职责。在

没有例外,没有借口。在

该死的忘了0在ZMQ_LINGER

一个值得一提的例子是ZeroMQ API参数的默认值,如果没有设置,则默认值为0,这意味着一旦这样的ZeroMQ-socket实例(显式或隐式)被指示到.close(),并且还有一个ZMQ_LINGER == 0,套接字端点将BLOCK直到来自交易对手缓冲区的所有消息被传递,这可能导致您的分布式处理挂起,而没有任何机会在事后解决此类死锁,如果没有正确地预先设置,则不会永远等待挂起的消息。在

一个更新的pyzmq文档明确警告不要.destroy()一个zmq.Context实例(并且盲目地让一个权威发布的.destroy()-d获取套接字.close()-d,这是一个排除了自己代码控件的权限)

ctx.destroy( linger = None )
Close all sockets associated with this context, and then terminate the context. If linger is specified, the LINGER sockopt of the sockets will be set prior to closing.

Warning

.destroy involves calling zmq_close(), which is NOT threadsafe. If there are active sockets in other threads, this must not be called ( which advice, most probably, the SIGTERM & al will ignore, wouldn't it? )

因此,甚至还有一些理由不依赖SIGTERM魔鬼的服务。在

正在使用的端口

另外,释放占用的传输类资源需要一些时间。因此,拥有一个刚刚发布了一个IP:port的代码并不意味着另一个实例/进程/线程可以直接跳入并捕捉同一个端口,而不存在与O/S相关的延迟。相反,在这方面也要检查一下你的资源重用/释放策略(我敢冒这个区域阻塞的风险,用一些端口地址池来轮换和排队,这样至少可以推迟任何潜在的重用情况,在一个合理的O/S相关的延迟完全过期之前,IMHO阻止阻塞状态比事后处理阻塞状态上的异常要好得多)。在

.bind()之前.connect()

是另一个这样的问题。一旦您的subprocess.Popen(...)启动,在O/S服务启动并使子进程开始独立呼吸之前需要一段时间。在

如果已经处于活动状态并正在执行的第一个进程已到达.connect(),则在派生的子进程实例到达.bind()之前,分布式系统将阻塞。在

安装/拆卸往返时间不可减少到零。资源不是一次性的。有一些与系统相关的维护和共享开销与它们的使用有关。在

最后.recv_string()可能并确实引发了ZMQError EAGAIN

在某些情况下,还没有任何消息r已经在本地节点中通过任何.recv*()方法获取它,无论它是flags = zmq.NOBLOCK模式下的{.recv|.recv_string|.recv_json|&al}。在

相关问题 更多 >