Pyzmq向流s发送消息

2024-03-28 14:04:47 发布

您现在位置:Python中文网/ 问答频道 /正文

我尝试在pyzmq中实现两个流套接字之间的连接的简单示例。在

发件人.py

import zmq

context = zmq.Context()
socket = context.socket(zmq.STREAM)
socket.bind("tcp://*:5555")

socket.connect("tcp://localhost:5556")
socket.send("message")

接收器.py

^{pr2}$

输出

Received [ b'\x00k\x8bEg' ]
Received [ b'' ]

我想问一下在流套接字之间发送消息的正确方法是什么。在


Tags: pyimportlocalhost示例streambindconnectcontext
2条回答

下面是一个使用pyzmq进行单向连接的简化示例。在

发件人.py

import zmq

context = zmq.Context()
socket = context.socket(zmq.STREAM)

socket.connect("tcp://localhost:5555")
id = socket.getsockopt(zmq.IDENTITY)
socket.send(id, zmq.SNDMORE)
socket.send(b"message")

接收器.py

^{pr2}$

您的socket.recv()-ed数据与ZeroMQ规范完全匹配,尽管它们不必让您感到高兴,而且您怀疑为什么会得到这些数据,而不是准确地传递所发送消息的精确副本。在

所以,耐心点,继续阅读。在

ZeroMQ最近添加了STREAM套接字原型相当具体

任何对ZeroMQ信令/消息传递工具有几年经验的人都会告诉您,最近(v4.x)添加的STREAM原型并不是ZeroMQ进程与ZeroMQ进程相互通信需求的最佳选择。在

为什么?ZeroMQ工具所拥有的所有gem都是并且必须是STREAM中的快捷方式,以便允许ZeroMQ套接字访问点能够与另一个套接字端点进程“对话”,后者对ZeroMQ智能套接字高级协议一无所知。在

Native pattern

The native pattern is used for communicating with TCP peers and allows asynchronous requests and replies in either direction. ZMQ_STREAM

A socket of type ZMQ_STREAM is used to send and receive TCP data from a non-ØMQ peer, when using the tcp:// transport. A ZMQ_STREAM socket can act as client and/or server, sending and/or receiving TCP data asynchronously.

When receiving TCP data, a ZMQ_STREAM socket shall prepend a message part containing the identity of the originating peer to the message before passing it to the application. Messages received are fair-queued from among all connected peers.

When sending TCP data, a ZMQ_STREAM socket shall remove the first part of the message and use it to determine the identity of the peer the message shall be routed to, and unroutable messages shall cause an EHOSTUNREACH or EAGAIN error.

To open a connection to a server, use the zmq_connect() call, and then fetch the socket identity using the ZMQ_IDENTITYzmq_getsockopt() call.

To close a specific connection, send the identity frame followed by a zero-length message (see EXAMPLE section).

When a connection is made, a zero-length message will be received by the application. Similarly, when the peer disconnects (or the connection is lost), a zero-length message will be received by the application.

You must send one identity frame followed by one data frame. The ZMQ_SNDMORE flag is required for identity frames but is ignored on data frames.

EXAMPLE

void    *ctx = zmq_ctx_new ();
assert ( ctx );
/*                                             Create ZMQ_STREAM socket */
void    *socket = zmq_socket ( ctx, ZMQ_STREAM );
assert ( socket );

int      rc = zmq_bind ( socket, "tcp://*:8080" );
assert ( rc == 0 );

/*                                            Data structure to hold the ZMQ_STREAM ID */
uint8_t id [256];
size_t  id_size = 256;

/*                                            Data structure to hold the ZMQ_STREAM received data */
uint8_t raw [256];
size_t  raw_size = 256;

while ( 1 ) {
   /*                                         Get HTTP request; ID frame and then request */
   id_size  = zmq_recv ( socket, id, 256, 0 );
   assert ( id_size >  0 );
   do {
        raw_size  = zmq_recv ( socket, raw, 256, 0 );
        assert ( raw_size >= 0 );
   } while (     raw_size == 256 );
   /*                                         Prepares the response */
   char http_response [] =
                            "HTTP/1.0 200 OK\r\n"
                            "Content-Type: text/plain\r\n"
                            "\r\n"
                            "Hello, World!";
   /*                                         Sends the ID frame followed by the response */
   zmq_send ( socket, id, id_size, ZMQ_SNDMORE );
   zmq_send ( socket, http_response, strlen ( http_response ), 0 );

   /*                                         Closes the connection by sending the ID frame followed by a zero response */
   zmq_send ( socket, id, id_size, ZMQ_SNDMORE );
   zmq_send ( socket, 0, 0, 0 );
}
zmq_close ( socket );
zmq_ctx_destroy ( ctx );

如果您遵循多连接套接字情况下STREAM行为的描述,那么发送方将碰巧在socket实例上接收一个公平队列循环读取,该实例连接到多个端点(1x通过.connect()+Nx,通过.bind()N = < 0, +INF ))连接到多个端点,到目前为止还没有对计数进行任何控制或者/以及通信对等点的性质,但是在socket.recv()-s上有一个公平的排队循环机制,这绝对不是一个安全的设计实践。在

^{pr2}$

相关问题 更多 >