我使用简单的一对一PUSH/PULL
工作者/服务器python代码来发送和接收消息。你知道吗
worker使用PUSH
套接字向PULL
服务器发送消息。服务器处理单元没有工作进程强大,因此当发送太多消息时,服务器的RAM开始增长,直到系统杀死所有消息。你知道吗
我试着设置接收器高水位线如下:
worker_sock_in = ZMQ_CTXT.socket(zmq.PULL)
worker_sock_in.setsockopt(zmq.LINGER, 1000))
worker_sock_in.setsockopt(zmq.RCVTIMEO, 1000)) # detects if the link is broken
worker_sock_in.setsockopt(zmq.RCVHWM, 1000)
worker_sock_in_port = worker_sock_in.bind_to_random_port(listen_addr, port_start, port_end)
以下代码用于worker创建和发送消息:
sock_dest = ZMQ_CTXT.socket(zmq.PUSH)
sock_dest.setsockopt(zmq.LINGER, 1000))
sock_dest.setsockopt(zmq.SNDTIMEO, 1000)) # detects if the link is broken
sock_dest.setsockopt(zmq.SNDHWM, 0) # never block on sending msg
sock_dest.connect(sock_dest_address)
# sends a msg
self.sock_dest.send(msg, zmq.NOBLOCK)
它似乎纠正了这个问题,但我的猜测是溢出消息只是由服务器丢弃,这是不可接受的在我的情况。你知道吗
我已经使用这个thread建立了我的开发基础,但是我不确定是否理解答案的附加信息部分。你知道吗
所以问题是,HWM在noblock push/pull zeromq套接字上的真正行为是什么?有没有一种方法可以保证所有发送的消息都能被pull套接字接收,而不会膨胀内存或阻塞发送方?你知道吗
有办法吗?是的,有:
内置的ZeroWarranty(包括作为原始邮件的1:1位副本传递的邮件或根本不传递的邮件)需要通过应用程序级协议进行扩展(包括未传递邮件的重新发送,在确认之前)或将您的基础设施移到使用特定的有保证的交付协议,这将有助于实现这一点,高于标准,要求用户使用
norm://
传输类扩展并移动范例,在这种情况下,PUSH/PULL
仍然没有处于RTO状态,而是进入了PUB/SUB, XPUB/XSUB
可伸缩的正式通信模式原型。你知道吗膨胀内存需求有两种方法:一种是对
.send()
-er进行显式控制,而不是将.send()
-er端的Context()
-实例的资源(RAM)淹没,即在有限的资源限制范围内(主要是防止发生任何淹没/丢弃的消息),另一个-有足够的RAM和正确配置的Context()
-实例,让所有的数据流通过。你知道吗首先,让我们解开这个谜团。
ZMQ_NOBLOCK
-指令指向本地,.send()
-端Context()
立即将对.send()
-方法的调用返回给调用者,即不阻止调用代码的执行(消息有效负载放在本地ZeroMQContext()
-实例内进行进一步处理,而不管其内部状态如何-这是一种经典的非阻塞代码设计 ). 你知道吗相反,
ZMQ_SNDHWM
指示Context()
-实例,如何设置此套接字的阈值,以及对于所述PUSH/PULL
.send()
-er情况:同时使用
ZMQ_XPUB_NODROP
可能有助于norm://
传输类用例。你知道吗还要注意的是,默认情况下,
ZMQ_PUSH
-sockets的API确认:对于表现欠佳的嫌疑犯(在
PULL
侧),还可以在O/S侧测试适当大小的设置,使用.getsockopt( ZMQ_RCVBUF )
-方法,并根据需要使用适当、足够大的.setsockopt( ZMQ_RCVBUF )
调整大小:如果上面没有任何帮助,可以使用
zmq_socket_monitor
服务将自诊断元平面注入ZeroMQ基础结构,并获得对通常发生在应用程序代码视线之外的情况的完全控制(根据需要反映内部API状态和转换)。你知道吗决定权在你。你知道吗
我建议您在中间(发送者和接收者之间)添加一个代理,它将保存给定时间内发送的消息。您将不得不进行代码逻辑,以保存消息,并在服务器未收到特定消息时收到通知。0mq不提供保存或带回丢失消息的方法。你知道吗
相关问题 更多 >
编程相关推荐