高效的Python原始输入和串口轮询

2 投票
1 回答
5274 浏览
提问于 2025-04-18 13:26

我正在做一个Python项目,这个项目需要从COM端口获取数据,同时也要获取用户输入。目前程序运行得很好,但我觉得效率不高。我把从串口获取数据的部分放在一个单独的线程里,用一个循环不断获取数据,然后把数据放进一个队列里。用户输入的部分也是在一个单独的线程里,用循环获取输入,然后也放进一个队列里。不过,我的代码太多了,发上来会让问题变得复杂。

所以,有没有更有效的方法来获取串口数据或者用户输入,而不是让它们在无限循环中运行在各自的线程里呢?

我在这个话题上做了很多研究,发现大家都在用“单独线程和队列”的方法。然而,当我运行这个程序时,我的四核i7处理器几乎用了30%的CPU资源,感觉应该有更好的办法。

我之前在C语言中使用过中断服务程序(ISR),希望能找到类似中断的东西可以用。最近我发现了很多带回调的“事件”库,但我总是搞不清它们如何适应我的情况。我是在一台Windows 7(64位)机器上开发,但完成后会把产品移到树莓派上。我并不想要代码,只是希望能得到一些方向上的指导。谢谢你提供的任何信息。

1 个回答

6

你看到CPU使用率很高,是因为你的主线程在用非阻塞的 get_nowait 方法不停地检查两个不同的队列,这样就会导致它一直在循环。这个循环一直在跑,就像任何紧凑的无限循环一样,会消耗CPU的计算能力。为了减少CPU的使用,你应该让无限循环使用阻塞的输入输出,这样它会等到有数据可以处理时再继续运行。这样一来,你就不会一直在循环中跑,从而减少了CPU的使用。

所以,用户输入线程:

while True:
    data = raw_input() # This blocks, and won't use CPU while doing so
    queue.put({'type' : 'input' : 'data' : data})

COM线程:

while True:
    data = com.get_com_data()  # This blocks, and won't use CPU while doing so
    queue.put({'type' : 'COM' : 'data' : data})

主线程:

while True:
    data = queue.get() # This call will block, and won't use CPU while doing so
    # process data

阻塞的 get 方法会一直等到被另一个线程的 put 唤醒,这个过程是通过 threading.Condition 对象来实现的。它不会反复去检查。从 Queue.py

# Notify not_empty whenever an item is added to the queue; a
# thread waiting to get is notified then.
self.not_empty = _threading.Condition(self.mutex)

...

def get(self, block=True, timeout=None):
    self.not_empty.acquire()
    try:
        if not block:
            if not self._qsize():
                raise Empty
        elif timeout is None:
            while not self._qsize():
                self.not_empty.wait()  # This is where the code blocks
        elif timeout < 0:
            raise ValueError("'timeout' must be a non-negative number")
        else:
            endtime = _time() + timeout
            while not self._qsize():
                remaining = endtime - _time()
                if remaining <= 0.0:
                    raise Empty
                self.not_empty.wait(remaining)
        item = self._get()
        self.not_full.notify()
        return item
    finally:
        self.not_empty.release()

def put(self, item, block=True, timeout=None):
    self.not_full.acquire()
    try:
        if self.maxsize > 0:
            if not block:
                if self._qsize() == self.maxsize:
                    raise Full
            elif timeout is None:
                while self._qsize() == self.maxsize:
                    self.not_full.wait()
            elif timeout < 0:
                raise ValueError("'timeout' must be a non-negative number")
            else:
                endtime = _time() + timeout
                while self._qsize() == self.maxsize:
                    remaining = endtime - _time()
                    if remaining <= 0.0:
                        raise Full
                    self.not_full.wait(remaining)
        self._put(item)
        self.unfinished_tasks += 1
        self.not_empty.notify()  # This is what wakes up `get`
    finally:
        self.not_full.release()

撰写回答