Python中的正确线程处理
我正在写一个家庭自动化助手——它们基本上是一些像守护进程一样的小型Python应用程序。每个助手可以作为一个独立的进程运行,但因为会有很多这样的助手,所以我决定搭建一个小的调度器,让每个助手在自己的线程中运行,并且能够在将来某个线程崩溃时进行处理。
这就是它的样子(使用了两个类):
from daemons import mosquitto_daemon, gtalk_daemon
from threading import Thread
print('Starting daemons')
mq_client = mosquitto_daemon.Client()
gt_client = gtalk_daemon.Client()
print('Starting MQ')
mq = Thread(target=mq_client.run)
mq.start()
print('Starting GT')
gt = Thread(target=gt_client.run)
gt.start()
while mq.isAlive() and gt.isAlive():
pass
print('something died')
问题是,MQ守护进程(moquitto)直接运行时一切正常:
mq_client = mosquitto_daemon.Client()
mq_client.run()
它会启动并保持运行,监听所有相关主题的消息——这正是我想要的。
然而,在调度器中运行时,它的表现就有点奇怪——它会接收到一条消息,然后就停止工作,尽管线程仍然显示是活着的。考虑到它在没有线程的情况下运行得很好,我猜想我在调度器中做错了什么。
我引用一下MQ客户端的代码,以防万一:
import mosquitto
import config
import sys
import logging
class Client():
mc = None
def __init__(self):
logging.basicConfig(format=u'%(filename)s:%(lineno)d %(levelname)-8s [%(asctime)s] %(message)s', level=logging.DEBUG)
logging.debug('Class initialization...')
if not Client.mc:
logging.info('Creating an instance of MQ client...')
try:
Client.mc = mosquitto.Mosquitto(config.DEVICE_NAME)
Client.mc.connect(host=config.MQ_BROKER_ADDRESS)
logging.debug('Successfully created MQ client...')
logging.debug('Subscribing to topics...')
for topic in config.MQ_TOPICS:
result, some_number = Client.mc.subscribe(topic, 0)
if result == 0:
logging.debug('Subscription to topic "%s" successful' % topic)
else:
logging.error('Failed to subscribe to topic "%s": %s' % (topic, result))
logging.debug('Settings up callbacks...')
self.mc.on_message = self.on_message
logging.info('Finished initialization')
except Exception as e:
logging.critical('Failed to complete creating MQ client: %s' % e.message)
self.mc = None
else:
logging.critical('Instance of MQ Client exists - passing...')
sys.exit(status=1)
def run(self):
self.mc.loop_forever()
def on_message(self, mosq, obj, msg):
print('meesage!!111')
logging.info('Message received on topic %s: %s' % (msg.topic, msg.payload))
2 个回答
有几个事情需要注意:
zeromq不喜欢在一个线程中初始化然后在另一个线程中运行。你可以把Client()改成一个线程,或者写一个自己的函数来创建一个Client,然后在一个线程中运行这个函数。
Client()里面有一个类级别的变量mc。我猜mosquitto_daemon和gtalk_daemon都在使用同一个Client,所以它们在争夺哪个Client.mc会胜出。
"while mq.isAlive() and gt.isAlive(): pass"会占用整个处理器,因为它一直在不停地检查,没有休息。考虑到Python的线程机制(全局解释器锁(GIL)只允许一个线程在同一时间运行),这会让你的“守护进程”停滞不前。
再考虑到GIL,原来的守护进程实现可能会表现得更好。
你正在把一个类的 run
方法传给 Thread
,但是 Thread
并不知道该怎么处理这个方法。
threading.Thread
有两种主要的用法:一种是创建一个独立的函数作为线程,另一种是作为一个类的基类,这个类里面有一个 run
方法。在你的情况下,使用基类的方式更合适,因为你的 Client
类里面有一个 run
方法。
你可以在你的 MQ
类中替换以下内容,这样应该就能正常工作了:
from threading import Thread
class Client(Thread):
mc = None
def __init__(self):
Thread.__init__(self) # initialize the Thread instance
...
...
def stop(self):
# some sort of command to stop mc
self.mc.stop() # not sure what the actual command is, if one exists at all...
然后在调用的时候,不要使用 Thread
:
mq_client = mosquitto_daemon.Client()
mq_client.start()
print 'Print this line to be sure we get here after starting the thread loop...'