如何在Python中关闭线程?

3 投票
3 回答
14368 浏览
提问于 2025-04-17 13:50

我在处理线程时遇到了一些问题,感觉有太多线程没有完成。

我觉得队列的命令 .join() 只是关闭了队列,而没有关闭正在使用它的线程。

在我的脚本中,我需要检查28万个域名,并为每个域名获取它的MX记录列表,还要获取服务器的IPv6地址(如果有的话)。

我使用了线程,这让我的脚本运行速度快了很多。但是有个问题,虽然我用到了join()来处理队列,但活跃的线程数量却在不断增加,直到出现错误,提示无法创建新的线程(可能是操作系统的限制?)。

我该如何在每次从数据库获取新域名的For循环后,终止/关闭/停止/重置这些线程呢?

线程类的定义...

class MX_getAAAA_thread(threading.Thread):
    def __init__(self,queue,id_domain):
        threading.Thread.__init__(self)
        self.queue = queue
        self.id_domain = id_domain


    def run(self):
        while True:
            self.mx = self.queue.get()

            res = dns.resolver.Resolver()
            res.lifetime = 1.5
            res.timeout = 0.5

            try:
                answers = res.query(self.mx,'AAAA')
                ip_mx = str(answers[0])
            except:
                ip_mx = "N/A"

            lock.acquire()

            sql = "INSERT INTO mx (id_domain,mx,ip_mx) VALUES (" + str(id_domain) + ",'" + str(self.mx) + "','" + str(ip_mx) + "')"
            try:
                cursor.execute(sql)
                db.commit()
            except:
                db.rollback()

            print "MX" , '>>' , ip_mx, ' :: ', str(self.mx)

            lock.release()
            self.queue.task_done()

线程类的使用情况... (这里没有主For循环,这只是它的一部分)

try:
    answers = resolver.query(domain, 'MX')

    qMX = Queue.Queue()
    for i in range(len(answers)):
        t = MX_getAAAA_thread(qMX,id_domain)
        t.setDaemon(True)
        threads.append(t)
        t.start()

    for mx in answers:
        qMX.put(mx.exchange)

    qMX.join()

except NoAnswer as e:
    print "MX - Error: No Answer"
except Timeout as etime:
    print "MX - Error: dns.exception.Timeout"

print "end of script"

我尝试过:

for thread in threads:
            thread.join()

在队列完成后,但 thread.join() 从来没有停止等待,尽管实际上没有必要等待,因为当 queue.join() 执行时,线程没有任何事情可做。

3 个回答

2

把线程连接起来就能解决问题,但在你的情况下,连接是一直阻塞的,因为你的线程从来没有退出运行循环。你需要让运行的方法结束,这样线程才能被连接。

5

我经常在我的线程里用无限循环的时候,把条件改成可以从外部控制的东西。比如这样:

def run(self):
    self.keepRunning = True
    while self.keepRunning:
        # do stuff

这样一来,我就可以从外部改变 keepRunning 这个属性,把它设成 false,这样线程在下次检查循环条件的时候就能优雅地结束了。

顺便说一下,既然你每放一个项目进队列就会启动一个线程,其实你根本不需要让线程一直循环。不过我认为你应该始终限制一下可以创建的线程数量(比如 for i in range(min(len(answers), MAX_THREAD_COUNT)):)。

另一种方法

在你的情况下,不需要在每次 for 循环迭代中结束线程,你可以直接重用这些线程。从你线程的源代码来看,唯一让一个线程在某次迭代中独特的就是你在创建时设置的 id_domain 属性。不过你也可以把这个属性和你的队列一起提供,这样线程就完全独立了,你可以重用它们。

这可能看起来像这样:

qMX = Queue.Queue()
threads = []
for i in range(MAX_THREAD_COUNT):
    t = MX_getAAAA_thread(qMX)
    t.daemon = True
    threads.append(t)
    t.start()

for id_domain in enumerateIdDomains():
    answers = resolver.query(id_domain, 'MX')
    for mx in answers:
        qMX.put((id_domain, mx.exchange)) # insert a tuple

qMX.join()

for thread in threads:
    thread.keepRunning = False

当然,你需要稍微改一下你的线程:

class MX_getAAAA_thread(threading.Thread):
    def __init__(self, queue):
        threading.Thread.__init__(self)
        self.queue = queue

    def run(self):
        self.keepRunning = True
        while self.keepRunning:
            id_domain, mx = self.queue.get()
            # do stuff
4

我不明白你为什么一开始就需要一个Queue(队列)。
毕竟在你的设计中,每个线程只处理一个任务。
你应该可以在创建线程的时候就把那个任务传给它。
这样你就不需要Queue了,也能省去while循环:

class MX_getAAAA_thread(threading.Thread):
    def __init__(self, id_domain, mx):
        threading.Thread.__init__(self)
        self.id_domain = id_domain
        self.mx = mx

这样你就可以去掉run方法里的while循环:

def run(self):
    res = dns.resolver.Resolver()
    res.lifetime = 1.5
    res.timeout = 0.5

    try:
        answers = res.query(self.mx,'AAAA')
        ip_mx = str(answers[0])
    except:
        ip_mx = "N/A"

    with lock:
        sql = "INSERT INTO mx (id_domain,mx,ip_mx) VALUES (" + str(id_domain) + ",'" + str(self.mx) + "','" + str(ip_mx) + "')"
        try:
            cursor.execute(sql)
            db.commit()
        except:
            db.rollback()

        print "MX" , '>>' , ip_mx, ' :: ', str(self.mx)

为每个任务创建一个线程

for mx in answers:
    t = MX_getAAAA_thread(qMX, id_domain, mx)
    t.setDaemon(True)
    threads.append(t)
    t.start()

然后把它们合并在一起

for thread in threads:
    thread.join()

撰写回答