如何在python中有效地缓冲来自原始套接字的udp数据包?

2024-04-29 15:40:10 发布

您现在位置:Python中文网/ 问答频道 /正文

我一直在用python编写一个包捕获/解析器实用程序。它工作得很好,但我有一个相当严重的内存泄漏。当我使用性能分析器时,它总是由我自己决定下一个()行作为内存累积的位置。我已经尝试了所有我能找到的实时网络流模块(pypcap、pcapy、scapy、pyshark),但似乎没有一个能改善漏洞。然而,直接使用socket不仅开始时占用的空间非常小,而且似乎以这样一种名义上的速度增长。你知道吗

然而,socket的问题是我错过了很多数据包。我可以说,因为我捕获的udp数据包在有效负载的前几个字节中有一个序列号,我用struct解包。我有一个tcpdump并排运行,tcpdump没有丢失任何数据包。你知道吗

我猜我丢失这么多数据包的原因是由于socket的接收缓冲区很小。我把它增加到了我的操作系统允许的最大值,通过实验看起来是425984字节。似乎没有什么不同。我的计划是尽可能快地将消息从网络上取下,并将它们放入队列中,这样我就可以有更多的喘息空间来处理数据包。到目前为止我还没有成功。我首先实现了一个队列,然后切换到heapq,但没有看到任何改进。你知道吗

下面是我的代码的接收部分,使用socket和heapqueue。通过调试,我几乎可以肯定丢失的数据包永远不会进入堆。你知道吗

#list for heap queue
uq []
#last sequence number for gap check
lastSeq = 0

#create raw socket 
s = socket.socket( socket.AF_PACKET , socket.SOCK_RAW , socket.ntohs(0x0003))
#check default buffer size
bufsize = s.getsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF)
print ("Buffer size [Before]:%d" %bufsize)
#increase the buffer
s.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF,450560)
#check that buffer has been increased 
bufsize = s.getsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF)
print ("Buffer size [After]:%d" %bufsize)
#narrow scope to specific interface in effort to cut down on throughput
iface_name = "em1'
s.bind((iface_name,0))


def parse():
    eth_length = 14
#wait 2 seconds for queue to fill
    time.sleep(2)
    while True:
       netq = heappop(uq)
       packet = netq[0]
       ancdata = netq[1]

       eth_length = 14
       eth_header = packet[:eth_length]
       eth = struct.unpack('!6s6sH' , eth_header)
       eth_protocol = socket.ntohs(eth[2])
       if eth_protocol == 8:
           ip_header = packet[eth_length:20+eth_length]
           iph = struct.unpack('!BBHHHBBH4s4s' , ip_header)
           protocol = iph[6]
           if protocol == 17:
               try:
                    MsgSeqNum = struct.unpack_from("<i", udp.body, offset=0)[0]
                    if (src in lastSeq) and (seqnum != lastSeq[src] + 1):
                        print('[udpparser] gap detected within parser! last/now',lastSeq[src],seqnum)
                        lastSeq[src] = seqnum
                    else:
#                        print('[udpparser] for good measure, no gap last/now',lastSeq[src],seqnum)
                         lastSeq[src] = seqnum       

netthread = threading.Thread(target=parse, )
netthread.daemon = True
netthread.start()


while True:
    packet, ancdata, flags, address = s.recvmsg(65545, 1024)

    heappush(uq,(packet,ancdata,))


Tags: srcforpacketsocket数据包protocollengthstruct