pythonzeromq推/拉逻辑,在不丢失任何消息的情况下,将高水位线设置为低端拉器

2024-05-29 00:29:32 发布

您现在位置:Python中文网/ 问答频道 /正文

我使用简单的一对一PUSH/PULL工作者/服务器python代码来发送和接收消息。你知道吗

worker使用PUSH套接字向PULL服务器发送消息。服务器处理单元没有工作进程强大,因此当发送太多消息时,服务器的RAM开始增长,直到系统杀死所有消息。你知道吗

我试着设置接收器高水位线如下:

worker_sock_in = ZMQ_CTXT.socket(zmq.PULL)
worker_sock_in.setsockopt(zmq.LINGER, 1000))
worker_sock_in.setsockopt(zmq.RCVTIMEO, 1000)) # detects if the link is broken
worker_sock_in.setsockopt(zmq.RCVHWM, 1000)
worker_sock_in_port = worker_sock_in.bind_to_random_port(listen_addr, port_start, port_end)

以下代码用于worker创建和发送消息:

sock_dest = ZMQ_CTXT.socket(zmq.PUSH)
sock_dest.setsockopt(zmq.LINGER, 1000))
sock_dest.setsockopt(zmq.SNDTIMEO, 1000)) # detects if the link is broken
sock_dest.setsockopt(zmq.SNDHWM, 0) # never block on sending msg
sock_dest.connect(sock_dest_address)
# sends a msg
self.sock_dest.send(msg, zmq.NOBLOCK)

它似乎纠正了这个问题,但我的猜测是溢出消息只是由服务器丢弃,这是不可接受的在我的情况。你知道吗

我已经使用这个thread建立了我的开发基础,但是我不确定是否理解答案的附加信息部分。你知道吗

所以问题是,HWM在noblock push/pull zeromq套接字上的真正行为是什么?有没有一种方法可以保证所有发送的消息都能被pull套接字接收,而不会膨胀内存或阻塞发送方?你知道吗


Tags: 代码in服务器消息portmsgzmqpush
2条回答

Q : is there a way to have a push pull infrastructure that guaranties all sent messages will be received by the pull socket without inflating its memory or blocking the sender?

有办法吗?是的,有:

内置的ZeroWarranty(包括作为原始邮件的1:1位副本传递的邮件或根本不传递的邮件)需要通过应用程序级协议进行扩展(包括未传递邮件的重新发送,在确认之前)或将您的基础设施移到使用特定的有保证的交付协议,这将有助于实现这一点,高于标准,要求用户使用norm://传输类扩展并移动范例,在这种情况下,PUSH/PULL仍然没有处于RTO状态,而是进入了PUB/SUB, XPUB/XSUB可伸缩的正式通信模式原型。你知道吗

A new transport option is available in libzmq. The "norm_engine.hpp" and "norm_engine.cpp" files provide an implementation of a NACK-Oriented Reliable Multicast (NORM) transport protocol option for ZeroMQ. NORM is an IETF open standards protocol specified in RFC 5740 and supporting documents. The Naval Research Laboratory (NRL) provides an open source reference implementation that is hosted at http://www.nrl.navy.mil/itd/ncs/products/norm.

NORM supports reliable data delivery over IP multicast but also supports unicast (point-to-point) data transfers. NORM operates on top of the User Datagram Protocol (UDP) and supports reliability via a NACK-based Automated Repeat Request (ARQ) that uses packet erasure coding for very efficient group communication. NORM also provides for automated TCP-friendly congestion control and mechanisms for support end-to-end flow control. The NRL NORM implementation can also be configured to provide basic UDP-like best effort transport service (with no receiver feedback) and this can be enhanced by adding some amount application-settable proactive forward error correction (FEC) packets to the transmission. I.e., by default NORM only sends 'reactive' FEC repair packets in response to NACKs but can also be configured to proactively send added repair packets for a level of reliability without any feedback from the receivers. In addition to its TCP-friendly congestion control, NORM can also be configured for fixed-rate operation and the NRL implementation supports some additional automated congestion control options suitable for use in bit error prone wireless communication environments. While its reliable ARQ operation is principally NACK-based (negative acknowledgement when packet loss is detected), it also supports optional positive acknowledgment (ACK) from receivers that can be used for delivery confirmation and explicit flow control.

膨胀内存需求有两种方法:一种是对.send()-er进行显式控制,而不是将.send()-er端的Context()-实例的资源(RAM)淹没,即在有限的资源限制范围内(主要是防止发生任何淹没/丢弃的消息),另一个-有足够的RAM和正确配置的Context()-实例,让所有的数据流通过。你知道吗


Q : what are the real behavior of HWM reached on noblock push/pull zeromq sockets?

首先,让我们解开这个谜团。ZMQ_NOBLOCK-指令指向本地,.send()-端Context()立即将对.send()-方法的调用返回给调用者,即不阻止调用代码的执行(消息有效负载放在本地ZeroMQContext()-实例内进行进一步处理,而不管其内部状态如何-这是一种经典的非阻塞代码设计 ). 你知道吗

相反,ZMQ_SNDHWM指示Context()-实例,如何设置此套接字的阈值,以及对于所述PUSH/PULL.send()-er情况:

he high water mark is a hard limit on the maximum number of outstanding messages ØMQ shall queue in memory for any single peer that the specified socket is communicating with. A value of zero means no limit.

If this limit has been reached the socket shall enter an exceptional state and depending on the socket type, ØMQ shall take appropriate action such as blocking or dropping sent messages. Refer to the individual socket descriptions in zmq_socket(3) for details on the exact action taken for each socket type.

ØMQ does not guarantee that the socket will accept as many as ZMQ_SNDHWM messages, and the actual limit may be as much as 60-70% lower depending on the flow of messages on the socket.

同时使用ZMQ_XPUB_NODROP可能有助于norm://传输类用例。你知道吗

还要注意的是,默认情况下,ZMQ_PUSH-sockets的API确认:

When a ZMQ_PUSH socket enters the mute state due to having reached the high water mark for all downstream nodes, or if there are no downstream nodes at all, then any zmq_send(3) operations on the socket shall block until the mute state ends or at least one downstream node becomes available for sending; messages are not discarded.


对于表现欠佳的嫌疑犯(在PULL侧),还可以在O/S侧测试适当大小的设置,使用.getsockopt( ZMQ_RCVBUF )-方法,并根据需要使用适当、足够大的.setsockopt( ZMQ_RCVBUF )调整大小:

The ZMQ_RCVBUF option shall set the underlying kernel receive buffer size for the socket to the specified size in bytes. A value of -1 means leave the OS default unchanged. For details refer to your operating system documentation for the SO_RCVBUF socket option.


如果上面没有任何帮助,可以使用zmq_socket_monitor服务将自诊断元平面注入ZeroMQ基础结构,并获得对通常发生在应用程序代码视线之外的情况的完全控制(根据需要反映内部API状态和转换)。你知道吗

决定权在你。你知道吗

我建议您在中间(发送者和接收者之间)添加一个代理,它将保存给定时间内发送的消息。您将不得不进行代码逻辑,以保存消息,并在服务器未收到特定消息时收到通知。0mq不提供保存或带回丢失消息的方法。你知道吗

相关问题 更多 >

    热门问题