Python 线程设计问题(与一个永久阻塞的方法协作)
更新内容
我找到了解决我之前问题的方法,具体可以参考这个链接:在Python中有没有办法终止一个线程?
我想对我的设计得到一些反馈。我们有一个比较大的系统,其中一个部分是通信组件。这个组件负责两件事:一是发送消息,二是将收到的消息进行排队(可以是文件、数据库、队列等)。我通过一个提供的库来接收消息,这个库基本上可以无限期地等待消息到达。
我有几个问题:
1)最好是有一个主线程和两个子线程吗?
2)让接收线程一直阻塞,直到消息到达,这样好吗?还是应该让它有个超时(这算是个例外),然后继续循环?
如果你需要更多信息,请告诉我。
现在我基本上有一个接收线程和一个主线程(主线程也负责发送),你可以在下面看到。下面唯一没有提到的是我现在可以调用“terminate”来结束MessageReceiver。
原始内容
我正在尝试在Python中围绕一些接收逻辑创建一个线程。基本上,我们有一个应用程序,它会在后台运行一个线程来轮询消息,我遇到的问题是,实际拉取消息的那部分会无限期等待消息到来。这让它无法终止……我最终把拉取消息的部分放在另一个线程里,但我想确认这样做是否是最好的方法。
原始代码:
class Manager:
def __init__(self):
receiver = MessageReceiver()
receiver.start()
#do other stuff sending/etc...
class MessageReceiver(Thread):
receiver = Receiver()
def __init__(self):
Thread.__init__(self)
def run(self):
#stop is a flag that i use to stop the thread...
while(not stopped ):
#can never stop because pull below blocks
message = receiver.pull()
print "Message" + message
我知道明显存在锁定问题,但这样控制一个永远等待消息的接收线程合适吗?
我注意到的一件事是,这个线程在等待消息时会占用100%的CPU……
如果你需要看到停止逻辑,请告诉我,我会发上来。
3 个回答
1) 如果你的第三个线程只是用来等待接收器和发送器,那就没必要创建它。
处理多个进程比处理多个线程要麻烦一些。进程的一个大优点是它们可以避免CPython在多线程方面的限制,也就是说,两个线程不能同时进行处理(不过一个线程可以在其他线程等待输入输出时运行)。所以,除非这两个线程都需要进行大量处理,否则把它们放在同一个进程里会更好。
2) 你应该让接收器有个超时设置,并在循环中检查是否有终止的标志。
哪个函数最终会等待消息的到来呢?很可能有一种方法可以设置超时,这样就可以定期检查一个结束标志。
因为这个问题从来没有一个真正的答案,所以我来总结一下:
- 有一个主线程负责发送消息,并且会创建一个接收线程。
- 这个接收线程会一直阻塞,不会超时,因为抛出异常会很耗费资源,所以没必要让它超时。
为了停止这个线程,我基本上是设置一个停止标志,然后在父线程中发送一个“终止”的消息来关闭底层连接。这样会导致子线程抛出一个异常。
在处理异常的地方,我会检查这个连接异常,如果“停止”标志被设置了,我就会优雅地停止接收线程。如果是非正常退出,我会通知父线程。
希望这对你有帮助,这个问题真是让人烦,但我很高兴它终于解决了。
class Manager:
def __init__(self):
self.receiver = MessageReceiver(shutdown_hook = self.shutdown_hook)
self.receiver.start()
#do other stuff sending/etc...
def terminate(self):
self.receiver.stop()
self.receiver.close()
def shutdown_hook(self, t_id, child):
print '%s - Unexpected thread shutdown, handle this.. restart thread?' % str(t_id))
class MessageReceiver(Thread):
def __init__(self,shutdown_hook = None):
Thread.__init__(self)
self.receiver = Receiver()
self.shutdown_hook = shutdown_hook
def run(self):
#stop is a flag that i use to stop the thread...
while(not stopped ):
try:
message = self.receiver.pull()
print "Message" + message
except ConnectionException as e:
if stopped:
#we are in the process of stopping
pass
else:
self.shutdown_hook(self.iden, e)
break
finally:
receiver.close()