Python中的正确线程处理

1 投票
2 回答
1626 浏览
提问于 2025-04-17 15:39

我正在写一个家庭自动化助手——它们基本上是一些像守护进程一样的小型Python应用程序。每个助手可以作为一个独立的进程运行,但因为会有很多这样的助手,所以我决定搭建一个小的调度器,让每个助手在自己的线程中运行,并且能够在将来某个线程崩溃时进行处理。

这就是它的样子(使用了两个类):

from daemons import mosquitto_daemon, gtalk_daemon
from threading import Thread

print('Starting daemons')
mq_client = mosquitto_daemon.Client()
gt_client = gtalk_daemon.Client()

print('Starting MQ')
mq = Thread(target=mq_client.run)
mq.start()

print('Starting GT')
gt = Thread(target=gt_client.run)
gt.start()

while mq.isAlive() and gt.isAlive():
    pass
print('something died')

问题是,MQ守护进程(moquitto)直接运行时一切正常:

mq_client = mosquitto_daemon.Client()
mq_client.run()

它会启动并保持运行,监听所有相关主题的消息——这正是我想要的。

然而,在调度器中运行时,它的表现就有点奇怪——它会接收到一条消息,然后就停止工作,尽管线程仍然显示是活着的。考虑到它在没有线程的情况下运行得很好,我猜想我在调度器中做错了什么。

我引用一下MQ客户端的代码,以防万一:

import mosquitto
import config
import sys
import logging


class Client():
    mc = None

    def __init__(self):
        logging.basicConfig(format=u'%(filename)s:%(lineno)d %(levelname)-8s [%(asctime)s]  %(message)s', level=logging.DEBUG)
        logging.debug('Class initialization...')
        if not Client.mc:
            logging.info('Creating an instance of MQ client...')
            try:
                Client.mc = mosquitto.Mosquitto(config.DEVICE_NAME)
                Client.mc.connect(host=config.MQ_BROKER_ADDRESS)
                logging.debug('Successfully created MQ client...')
                logging.debug('Subscribing to topics...')
                for topic in config.MQ_TOPICS:
                    result, some_number = Client.mc.subscribe(topic, 0)
                    if result == 0:
                        logging.debug('Subscription to topic "%s" successful' % topic)
                    else:
                        logging.error('Failed to subscribe to topic "%s": %s' % (topic, result))
                logging.debug('Settings up callbacks...')
                self.mc.on_message = self.on_message
                logging.info('Finished initialization')
            except Exception as e:
                logging.critical('Failed to complete creating MQ client: %s' % e.message)
                self.mc = None
        else:
            logging.critical('Instance of MQ Client exists - passing...')
            sys.exit(status=1)

    def run(self):
        self.mc.loop_forever()

    def on_message(self, mosq, obj, msg):
        print('meesage!!111')
        logging.info('Message received on topic %s: %s' % (msg.topic, msg.payload))

2 个回答

1

有几个事情需要注意:

  1. zeromq不喜欢在一个线程中初始化然后在另一个线程中运行。你可以把Client()改成一个线程,或者写一个自己的函数来创建一个Client,然后在一个线程中运行这个函数。

  2. Client()里面有一个类级别的变量mc。我猜mosquitto_daemon和gtalk_daemon都在使用同一个Client,所以它们在争夺哪个Client.mc会胜出。

  3. "while mq.isAlive() and gt.isAlive(): pass"会占用整个处理器,因为它一直在不停地检查,没有休息。考虑到Python的线程机制(全局解释器锁(GIL)只允许一个线程在同一时间运行),这会让你的“守护进程”停滞不前。

  4. 再考虑到GIL,原来的守护进程实现可能会表现得更好。

2

你正在把一个类的 run 方法传给 Thread,但是 Thread 并不知道该怎么处理这个方法。

threading.Thread 有两种主要的用法:一种是创建一个独立的函数作为线程,另一种是作为一个类的基类,这个类里面有一个 run 方法。在你的情况下,使用基类的方式更合适,因为你的 Client 类里面有一个 run 方法。

你可以在你的 MQ 类中替换以下内容,这样应该就能正常工作了:

from threading import Thread

class Client(Thread):
    mc = None

    def __init__(self):
        Thread.__init__(self) # initialize the Thread instance
        ...
    ...

    def stop(self):
        # some sort of command to stop mc
        self.mc.stop() # not sure what the actual command is, if one exists at all...

然后在调用的时候,不要使用 Thread

mq_client = mosquitto_daemon.Client()
mq_client.start() 

print 'Print this line to be sure we get here after starting the thread loop...'

撰写回答