Python中的调度问题

0 投票
3 回答
4511 浏览
提问于 2025-04-15 13:16

我正在用Python来和一个硬件USB嗅探器设备进行交互,使用的是厂商提供的Python接口。我想在一个单独的线程里不断读取设备中的USB数据包(这个部分运行得很好)。问题是我的主循环似乎再也没有机会运行了,因为我的读取循环占用了所有的注意力。

我的代码大概是这样的:

from threading import Thread
import time
usb_device = 0

def usb_dump(usb_device):
    while True:
        #time.sleep(0.001)
        packet = ReadUSBDevice(usb_device)
        print "packet pid: %s" % packet.pid

class DumpThread(Thread):
    def run(self):
        usb_dump()

usb_device = OpenUSBDevice()
t = DumpThread()
t.start()
print "Sleep 1"
time.sleep(1)
print "End"
CloseUSBDevice(usb_device)
sys.exit(0)

(我可以贴出实际的代码,但因为你需要硬件设备,所以我觉得这样没什么帮助)。

我希望这段代码能在主线程结束整个程序之前,先运行大约一秒钟,开始输出USB数据包。然而,我看到的只是“Sleep 1”,然后usb_dump()这个过程就一直在运行。如果我在usb_dump()过程的内部循环中取消注释“time.sleep(0.001)”这一行,事情就开始按照我预期的方式运行了,但这样Python代码就无法跟上所有进来的数据包了 :-(

厂商告诉我这是Python调度器的问题,不是他们API的错,所以他们不会帮我:

«不过,看起来你在使用Python的线程时遇到了一些细节问题。通过在DumpThread线程中放置time.sleep,你是在明确告诉Python的线程系统放弃控制。否则,切换线程的时机就由Python解释器来决定,通常是在执行了一定数量的字节码指令后才会切换线程。」

有人能确认这里是Python的问题吗?有没有其他方法可以让DumpThread释放控制权?还有其他想法吗?

3 个回答

0

我觉得这个供应商说得对。如果这是CPython的话,实际上是没有真正的并行线程的;一次只能有一个线程在执行。这是因为有一个叫做全局解释器锁的东西在起作用。

不过,你可以试试使用多进程模块,这个模块可以通过创建真正的子进程来绕过垃圾回收器的锁,从而实现一个不错的解决方案。

还有一种可能的办法是修改调度器的切换行为

2

我假设你写了一个Python的C模块,这个模块里有一个叫ReadUSBDevice的函数,目的是在接收到USB数据包之前一直等待,然后再返回这个数据包。

这个ReadUSBDevice的实现需要在等待USB数据包的时候释放Python的全局解释器锁(GIL),等收到数据包后再重新获取这个锁。这样做的好处是,其他的Python线程在你执行本地代码的时候也能继续运行。

http://docs.python.org/c-api/init.html#thread-state-and-the-global-interpreter-lock

在你释放了GIL的时候,是无法访问Python的。你需要先释放GIL,运行那个阻塞的函数,然后等你确认有数据可以返回给Python时,再重新获取GIL。

如果你不这样做,那么在你的本地代码阻塞的时候,其他的Python线程就无法执行。如果这个模块是由供应商提供的,而在本地阻塞时没有释放GIL,那就是个bug。

需要注意的是,如果你接收了很多数据包,并且在Python中处理它们,那么其他线程应该还是能运行的。虽然多个线程在运行Python代码时不会并行执行,但它们会频繁切换,给每个线程一个运行的机会。如果本地代码在阻塞而没有释放GIL,这种切换就无法进行。

编辑:我看到你提到这是一个供应商提供的库。如果你没有源代码,可以快速检查他们是否释放了GIL:在没有USB活动的时候启动ReadUSBDevice线程,这样ReadUSBDevice就会一直等待数据。如果他们释放了GIL,其他线程应该可以顺利运行。如果没有释放,就会阻塞整个解释器。这将是一个严重的bug。

3

如果你的代码是纯Python的,那你的供应商说得没错;不过,C扩展可以释放全局解释器锁(GIL),这样就能真正实现多线程。

特别是,time.sleep确实会释放GIL(你可以直接从源代码中查看,这里 - 看看floatsleep的实现);所以你的代码应该不会有问题。为了进一步证明,我做了一个简单的测试,只是去掉了对USB的调用,结果确实如预期那样工作:

from threading import Thread
import time
import sys

usb_device = 0

def usb_dump():
    for i in range(100):
        time.sleep(0.001)
        print "dumping usb"

class DumpThread(Thread):
    def run(self):
        usb_dump()

t = DumpThread()
t.start()
print "Sleep 1"
time.sleep(1)
print "End"
sys.exit(0)

最后,关于你发布的代码,有几点需要注意:

  • usb_device没有传递给线程。你需要把它作为参数传递,或者(唉!)告诉线程从全局命名空间获取它。
  • 与其强制调用sys.exit(),不如更好地只是给线程发送一个停止的信号,然后再关闭USB设备。我怀疑你的代码现在可能会有一些多线程的问题。
  • 如果你只是需要定期检查,使用threading.Timer类可能是更好的解决方案。

[更新] 关于最后一点:正如评论中所说,我认为Timer更符合你函数的语义(定期检查),并且可以自动避免由于供应商代码没有释放GIL而引发的问题。

撰写回答