终止多线程Python程序

74 投票
7 回答
95977 浏览
提问于 2025-04-15 15:26

如何让一个多线程的Python程序对Ctrl+C键事件做出响应?

编辑:代码是这样的:

import threading
current = 0

class MyThread(threading.Thread):
    def __init__(self, total):
        threading.Thread.__init__(self)
        self.total = total

    def stop(self):
        self._Thread__stop()

    def run(self):
        global current
        while current<self.total:
            lock = threading.Lock()
            lock.acquire()
            current+=1
            lock.release()
            print current

if __name__=='__main__':

    threads = []
    thread_count = 10
    total = 10000
    for i in range(0, thread_count):
        t = MyThread(total)
        t.setDaemon(True)
        threads.append(t)
    for i in range(0, thread_count):
        threads[i].start()

我尝试去掉所有线程上的join(),但还是不行。是不是因为每个线程的run()过程里面有锁的部分?

编辑:上面的代码应该是可以工作的,但当当前变量在5000到6000之间时,总是会被中断,并且出现如下错误:

Exception in thread Thread-4 (most likely raised during interpreter shutdown):
Traceback (most recent call last):
  File "/usr/lib/python2.5/threading.py", line 486, in __bootstrap_inner
  File "test.py", line 20, in run
<type 'exceptions.TypeError'>: unsupported operand type(s) for +=: 'NoneType' and 'int'
Exception in thread Thread-2 (most likely raised during interpreter shutdown):
Traceback (most recent call last):
  File "/usr/lib/python2.5/threading.py", line 486, in __bootstrap_inner
  File "test.py", line 22, in run

7 个回答

6

一个Worker可能对你有帮助:

#!/usr/bin/env python

import sys, time
from threading import *
from collections import deque

class Worker(object):
    def __init__(self, concurrent=1):
        self.concurrent = concurrent
        self.queue = deque([])
        self.threads = []
        self.keep_interrupt = False

    def _retain_threads(self):
        while len(self.threads) < self.concurrent:
            t = Thread(target=self._run, args=[self])
            t.setDaemon(True)
            t.start()
            self.threads.append(t)


    def _run(self, *args):
        while self.queue and not self.keep_interrupt:
            func, args, kargs = self.queue.popleft()
            func(*args, **kargs)

    def add_task(self, func, *args, **kargs):
        self.queue.append((func, args, kargs))

    def start(self, block=False):
        self._retain_threads()

        if block:
            try:
                while self.threads:
                    self.threads = [t.join(1) or t for t in self.threads if t.isAlive()]
                    if self.queue:
                        self._retain_threads()
            except KeyboardInterrupt:
                self.keep_interrupt = True
                print "alive threads: %d; outstanding tasks: %d" % (len(self.threads), len(self.queue))
                print "terminating..."


# example
print "starting..."
worker = Worker(concurrent=50)

def do_work():
    print "item %d done." % len(items)
    time.sleep(3)

def main():
    for i in xrange(1000):
        worker.add_task(do_work)
    worker.start(True)

main()
print "done."

# to keep shell alive
sys.stdin.readlines()
16

有两种主要的方法,一种比较干净,另一种比较简单。

干净的方法是,在你的主线程中捕捉到键盘中断(也就是你按下Ctrl+C的时候),然后设置一个标志,让你的后台线程可以检查这个标志,这样它们就知道该退出了。下面是一个简单但稍微有点乱的版本,使用了全局变量:

exitapp = False
if __name__ == '__main__':
    try:
        main()
    except KeyboardInterrupt:
        exitapp = True
        raise

def threadCode(...):
    while not exitapp:
        # do work here, watch for exitapp to be True

而简单但有点乱的方法是捕捉到键盘中断后,直接调用os._exit(),这样会立即终止所有线程。

96

让除了主线程以外的每个线程都成为守护线程(在2.6或更高版本中使用t.daemon = True,在2.6或更低版本中使用t.setDaemon(True),在启动每个线程对象t之前)。这样,当主线程接收到键盘中断(KeyboardInterrupt)时,如果它没有处理这个中断,或者处理了但决定还是要终止,整个程序就会结束。详细信息可以查看文档

编辑:刚看到提问者的代码(最初没有发布)以及“它不工作”的说法,我需要补充一下…:

当然,如果你希望主线程保持响应(例如,能够处理Ctrl+C),就不要让它陷入阻塞调用,比如join另一个线程——尤其是那些完全无用的阻塞调用,比如join守护线程。比如,可以把主线程的最后一个循环从当前的(毫无意义且有害的):

for i in range(0, thread_count):
    threads[i].join()

改成更合理的:

while threading.active_count() > 0:
    time.sleep(0.1)

如果你的主线程没有更好的事情可做,那就等所有线程自己结束,或者等待接收到Ctrl+C(或其他信号)。

当然,还有很多其他可用的模式,如果你希望你的线程不被突然终止(像守护线程可能会那样)——除非它们也被永远困在无条件的阻塞调用、死锁等情况中;-)。

撰写回答