我可以避免Python中线程化的UDP套接字丢失数据吗?

3 投票
5 回答
3572 浏览
提问于 2025-04-15 20:29

首先,我刚开始学习Python,正在工作中学习,所以请多多包涵!

我想写一个在Windows上运行的多线程Python应用程序,它的功能是从UDP套接字读取数据(线程1),把数据写入文件(线程2),并把实时数据展示到一个小部件上(gtk.Image,使用gtk.gdk.pixbuf)。我在不同线程之间用队列来传递数据。

我的问题是,如果我只启动线程1和线程3(暂时不写文件),似乎在前几个数据样本之后会丢失一些数据。虽然在这之后看起来没问题。即使我让线程1先完成再运行线程3,这种数据丢失的现象依然存在。

抱歉代码片段有点长(我去掉了写文件的线程),但我觉得去掉代码会引发更多问题。希望有人能帮我解答一下 :-)

import socket
import threading
import Queue
import numpy
import gtk
gtk.gdk.threads_init()
import gtk.glade
import pygtk


class readFromUDPSocket(threading.Thread):

    def __init__(self, socketUDP, readDataQueue, packetSize, numScans):
        threading.Thread.__init__(self)
        self.socketUDP = socketUDP
        self.readDataQueue = readDataQueue
        self.packetSize = packetSize
        self.numScans = numScans

    def run(self):
        for scan in range(1, self.numScans + 1):
            buffer = self.socketUDP.recv(self.packetSize)
            self.readDataQueue.put(buffer)
        self.socketUDP.close()
        print 'myServer finished!'


class displayWithGTK(threading.Thread):

    def __init__(self, displayDataQueue, image, viewArea):
        threading.Thread.__init__(self)
        self.displayDataQueue = displayDataQueue
        self.image = image
        self.viewWidth = viewArea[0]
        self.viewHeight = viewArea[1]
        self.displayData = numpy.zeros((self.viewHeight, self.viewWidth, 3), dtype=numpy.uint16)

    def run(self):
        scan = 0
        try:
            while True:
                if not scan % self.viewWidth: scan = 0
                buffer = self.displayDataQueue.get(timeout=0.1)
                self.displayData[:, scan, 0] = numpy.fromstring(buffer, dtype=numpy.uint16)
                self.displayData[:, scan, 1] = numpy.fromstring(buffer, dtype=numpy.uint16)
                self.displayData[:, scan, 2] = numpy.fromstring(buffer, dtype=numpy.uint16)
                gtk.gdk.threads_enter()
                self.myPixbuf = gtk.gdk.pixbuf_new_from_data(self.displayData.tostring(), gtk.gdk.COLORSPACE_RGB,
                                                        False, 8, self.viewWidth, self.viewHeight, self.viewWidth * 3)
                self.image.set_from_pixbuf(self.myPixbuf)
                self.image.show()
                gtk.gdk.threads_leave()
                scan += 1
        except Queue.Empty:
            print 'myDisplay finished!'
            pass


def quitGUI(obj):
    print 'Currently active threads: %s' % threading.enumerate()
    gtk.main_quit()


if __name__ == '__main__':

    # Create socket (IPv4 protocol, datagram (UDP)) and bind to address
    socketUDP = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    host = '192.168.1.5'
    port = 1024
    socketUDP.bind((host, port))

    # Data parameters
    samplesPerScan = 256
    packetsPerSecond = 1200
    packetSize = 512
    duration = 1  # For now, set a fixed duration to log data
    numScans = int(packetsPerSecond * duration)

    # Create array to store data
    data = numpy.zeros((samplesPerScan, numScans), dtype=numpy.uint16)

    # Create queue for displaying from
    readDataQueue = Queue.Queue(numScans)

    # Build GUI from Glade XML file
    builder = gtk.Builder()
    builder.add_from_file('GroundVue.glade')
    window = builder.get_object('mainwindow')
    window.connect('destroy', quitGUI)
    view = builder.get_object('viewport')
    image = gtk.Image()
    view.add(image)
    viewArea = (1200, samplesPerScan)

    # Instantiate & start threads
    myServer = readFromUDPSocket(socketUDP, readDataQueue, packetSize, numScans)
    myDisplay = displayWithGTK(readDataQueue, image, viewArea)

    myServer.start()
    myDisplay.start()

    gtk.gdk.threads_enter()
    gtk.main()
    gtk.gdk.threads_leave()
    print 'gtk.main finished!'

5 个回答

-1

看起来问题出在数据源上。主要有两个问题:

  1. 通过Wireshark这个工具观察,数据源并不是一直以每秒1200个数据包的速度发送。可能正如Len提到的,出站的数据处理过程中有丢包的问题。顺便说一下,这个数据源是一个可编程的卡片,带有一个以太网接口,连接到我的电脑上。

  2. 另一个问题是,在前15个数据包之后,总是会有丢包的情况。我发现如果在readFromUDPSocket线程的初始化部分接收20个数据包,那么之后就能正常读取数据,比如:

class readFromUDPSocket(threading.Thread):

    def __init__(self, socketUDP, readDataQueue, packetSize, numScans):
        threading.Thread.__init__(self)
        self.socketUDP = socketUDP
        self.readDataQueue = readDataQueue
        self.packetSize = packetSize
        self.numScans = numScans
        for i in range(0, 20):
            buffer = self.socketUDP.recv(self.packetSize)

    def run(self):
        for scan in range(1, self.numScans + 1):
            buffer = self.socketUDP.recv(self.packetSize)
            self.readDataQueue.put(buffer)
        self.socketUDP.close()
        print 'myServer finished!'

我不太确定这指向什么问题?!不过我觉得这些都排除了接收和处理速度不够快的可能性。

2

UDP本身就是一种不可靠的协议。你绝对不能写程序去期待UDP的数据包总是能顺利到达。

在TCP中,数据包也会丢失,但你的程序不需要担心,因为TCP应用程序处理的不是数据包,而是一串字节流。这里有很多机制确保你发送的字节'ABCD',最终会按顺序显示为'A'、'B'、'C'、'D'。当然,你可能会收到任何组合的数据包,比如'ABC'、'D',或者'AB'、'CD'等等。也可能你只收到'ABC',然后就没有了。

TCP之所以被称为“可靠”,并不是因为它能让你的网络线缆永远不出问题;它提供的保证是,在数据流中断之前,你会看到所有数据都是按顺序到达的。而一旦数据流中断,你就什么也看不到了。

而在UDP中,没有这样的保证。如果你发送四个UDP数据包,'AB'、'CD'、'EF'、'GH',你可能会收到全部,也可能一个都收不到,或者只收到一半,甚至只收到其中一个。你收到的顺序也可能是任意的。UDP唯一能保证的是,你不会看到包含'ABCD'的消息,因为这些字节是在不同的数据包中。

总结一下:这和Python、线程或GTK没有关系。这只是网络中一个基本的事实:有时候,你的电缆的电气特性并不适合让信息顺利传输。

你可以通过使用Twisted来简化程序的复杂性,特别是使用listenUDP API,这样你就不需要处理线程或它们与GTK的交互:你可以直接在相关的控件上调用方法,而不需要通过datagramReceived方法来传递。但是,这并不能解决你根本的问题:UDP有时候就是会丢失数据。真正的解决办法是说服你的数据源使用TCP。

4

UDP协议不会像TCP那样确认目标是否接收到数据——如果你想确保所有数据都能送到,就得在你的应用程序里自己实现重发等功能。你控制发送UDP数据的源头吗?

撰写回答