ZeroMQ CLRZMQ - 轮询套接字

2 投票
3 回答
1370 浏览
提问于 2025-04-18 05:03

我在用Python的Pyzmq库做ZMQ的工作,最近需要把一些代码移植到C#,用的是CLRZMQ这个绑定。我在网上找了很久文档,但几乎没找到什么有用的资料,只有一些过时的代码示例,根本不能用,这让我很沮丧。

我现在想实现的功能非常简单。我们有一个简单的函数,它会在一个套接字上进行轮询,并设置一个超时时间。当超时到达时,它会抛出一个自定义的异常。下面是Python中的代码示例:

def raise_on_timeout(sock, timeout):
    """
    sock is a ZMQ socket
    timeout is a timedelta object from datetime.
    """
    if timeout is not None:
        timeout = int(timeout.total_seconds() * 1000)
        if not sock.poll(timeout):
            raise TimeoutException("Communication timed out")

看起来很简单,对吧?这个函数是在send_multipart和recv_multipart之间调用的,这样如果在接收数据时超时,我们就可以处理这个情况。

那么在C#中用CLRZMQ怎么实现类似的功能呢?

另外,如果有关于CLRZMQ的API文档的话,麻烦告诉我在哪里可以找到。

谢谢!

3 个回答

0

好的,我成功让它工作了。

这是用C#写的一个类似于raise_on_timeout的方法的代码。

public static void RaiseOnTimeout(Context ctx, Socket sock, TimeSpan timeout)
{
    List<PollItem> pollItemsList = new List<PollItem>();
    PollItem pollItem = sock.CreatePollItem(IOMultiPlex.POLLIN);
    pollItemsList.Add(pollItem);

    int numReplies = ctx.Poll(pollItemsList.ToArray(), timeout.Value.Ticks * 10);

    if (numReplies == 0)
    {
        throw new TimeoutException();
    }
}

这样就可以了。不过这并不明显,因为C#的文档非常少。我只是从指南中的Java示例中获得了灵感,甚至那里也没有详细解释每个部分的作用。只好自己慢慢琢磨出来。

0

好的。我觉得可能找到了答案。不过我还没有测试过。

为了实现我想在Python中做的事情,这个应该可以用同样的方式做到。

private static void RaiseOnTimeout(Socket sock, long timeoutMicroSeconds)
{
    List<socket> sockList = new List<Socket>() {sock};
    int numEvents = Context.Poller(sockList), timeoutMicroSeconds);

    if (numEvents == 0)
    {
        throw new TimeoutException();
    }
}

大家有什么想法吗?

2

好的,这里是最终的答案。在这个例子中,我甚至不需要使用Context的实例。这样做要好得多。

下面是用C#编写一个类似的raise_on_timeout方法的代码。

public static void RaiseOnTimeout(Socket sock, TimeSpan timeout)
{
    List<PollItem> pollItemsList = new List<PollItem>();
    PollItem pollItem = sock.CreatePollItem(IOMultiPlex.POLLIN);
    pollItemsList.Add(pollItem);

    int numReplies = Context.Poller(pollItemsList.ToArray(), timeout.Value.Ticks * 10);

    if (numReplies == 0)
    {
        throw new TimeoutException();
    }
}

撰写回答