替代字节数组处理瓶颈的高速方案

8 投票
1 回答
1806 浏览
提问于 2025-04-18 05:44

>> 请看下面的编辑内容 <<

我正在使用FTDI D2xx驱动通过pyUSB处理来自一个特殊像素化CCD相机的数据,这个相机通过串口与电脑连接。

这个相机的传输速度很快,可以达到每秒80帧。我希望能达到这个速度,但我知道用Python实现这个速度是不太可能的,因为Python是一种脚本语言。不过,我想知道我能接近这个速度多少,是否有我在代码中遗漏的优化、线程处理,或者其他的解决方案。我马上想到可以把耗时最长的循环提取出来,放到C语言中去做,但我对C语言的经验不多,不太确定如何让Python与C代码进行交互,如果这是可能的话。我在Python中用SciPy/Numpy开发了复杂的算法,这些算法已经优化过,性能也可以接受,所以我需要一种方法来加快数据获取的速度,以便反馈给Python,如果这是最好的方法。

我选择Python的原因是因为我需要能够轻松地在不同平台上运行(我在Windows上开发,但要把代码放到一个嵌入式Linux板上,做成一个独立系统)。如果你建议我使用其他语言,比如C,我该如何在不同平台上工作?我从来没有在Windows和Linux之间编译过像C这样的低级语言,所以我想确保这个过程——我需要为每个系统编译一次,对吧?你有什么建议?


以下是我的函数和当前的执行时间:

ReadStream: 'RXcount'为114733,表示从设备读取数据,将字符串格式化为字节等价物

返回一个字节列表(0-255),表示二进制值

当前执行时间:0.037秒

def ReadStream(RXcount):
    global ftdi
    RXdata = ftdi.read(RXcount)
    RXdata = list(struct.unpack(str(len(RXdata)) + 'B', RXdata))
    return RXdata


ProcessRawData: 将字节列表重塑为与像素方向匹配的数组

处理后得到一个3584x32的数组,去掉了一些不需要的字节。

数据的独特之处在于,每14行代表设备上一个像素行的14位(每行有32个字节,每个字节8位,总共256位),也就是256x256个像素。处理后的数组有32列字节,因为每个字节在二进制中表示8个像素(32字节 * 8位 = 256个像素)。我还在研究如何实现这一点... 我之前已经为此发布了一个问题

当前执行时间:0.01秒...还不错,这只是Numpy的效果

def ProcessRawData(RawData):
    if len(RawData) == 114733:
        ProcessedMatrix = np.ndarray((1, 114733), dtype=int)
        np.copyto(ProcessedMatrix, RawData)
        ProcessedMatrix = ProcessedMatrix[:, 1:-44]
        ProcessedMatrix = np.reshape(ProcessedMatrix, (-1, 32))
        return ProcessedMatrix
    else:
        return None


最后,

GetFrame: 设备有一种模式,只输出像素是否检测到任何东西,使用数组的最低位(每14行) - 获取这些数据并将每个像素转换为整数

处理后得到一个256x256的数组,处理每14行,这些行是以二进制读取的字节(32字节... 32字节 * 8位 = 256个像素)

当前执行时间:0.04秒

def GetFrame(ProcessedMatrix):
    if np.shape(ProcessedMatrix) == (3584, 32):
        FrameArray = np.zeros((256, 256), dtype='B')
        DataRows = ProcessedMatrix[13::14]
        for i in range(256):
            RowData = ""
            for j in range(32):
                RowData = RowData + "{:08b}".format(DataRows[i, j])
            FrameArray[i] = [int(RowData[b:b+1], 2) for b in range(256)]
        return FrameArray
    else:
        return False


目标:

我希望通过你们的建议将总执行时间目标定在约0.02秒/帧(目前是0.25秒/帧,GetFrame函数是最慢的)。设备的输入输出不是限制因素,因为它每0.0125秒输出一个数据包。如果我能降低执行时间,那我可以通过一些线程处理来并行运行获取和处理吗?

请告诉我你认为的最佳解决方案 - 谢谢你的帮助!


编辑,感谢@Jaime:

现在的函数是:

def ReadStream(RXcount):
    global ftdi
    return np.frombuffer(ftdi.read(RXcount), dtype=np.uint8)

... 时间 0.013秒

def ProcessRawData(RawData):
    if len(RawData) == 114733:
        return RawData[1:-44].reshape(-1, 32)
    return None

... 时间 0.000007秒!

def GetFrame(ProcessedMatrix):
    if ProcessedMatrix.shape == (3584, 32):
        return np.unpackbits(ProcessedMatrix[13::14]).reshape(256, 256)
    return False

... 时间 0.00006秒!

所以,现在我能以所需的帧率获取数据了!经过对D2xx USB缓冲区和延迟时间的一些调整,我现在的速度是47.6帧每秒!

最后一步是看看有没有办法让这个与我的处理算法并行运行?需要一种方法将GetFrame的结果传递给另一个并行运行的循环。

1 个回答

5

有几个地方可以让你的程序运行得更快。最明显的就是重写 GetFrame 这个函数:

def GetFrame(ProcessedMatrix):
    if ProcessedMatrix.shape == (3584, 32):
        return np.unpackbits(ProcessedMatrix[13::14]).reshape(256, 256)
    return False

这要求 ProcessedMatrix 是一种类型为 np.uint8ndarray,不过除此之外,在我的系统上,这个函数的运行速度快了1000倍。

对于你另外两个函数,我觉得在 ReadStream 中你可以尝试这样做:

def ReadStream(RXcount):
    global ftdi
    return np.frombuffer(ftdi.read(RXcount), dtype=np.uint8)

即使这样做对这个函数的速度提升不大,因为读取数据本身占用了大部分时间,但这样你至少可以得到一个字节的 numpy 数组来处理。接下来,你可以继续使用 ProcessRawData,试试:

def ProcessRawData(RawData):
    if len(RawData) == 114733:
        return RawData[1:-44].reshape(-1, 32)
    return None

这个版本的速度比你之前的快了10倍。

撰写回答