Python 线程设计问题(与一个永久阻塞的方法协作)

0 投票
3 回答
537 浏览
提问于 2025-04-16 11:49

更新内容

我找到了解决我之前问题的方法,具体可以参考这个链接:在Python中有没有办法终止一个线程?

我想对我的设计得到一些反馈。我们有一个比较大的系统,其中一个部分是通信组件。这个组件负责两件事:一是发送消息,二是将收到的消息进行排队(可以是文件、数据库、队列等)。我通过一个提供的库来接收消息,这个库基本上可以无限期地等待消息到达。

我有几个问题:

1)最好是有一个主线程和两个子线程吗?

2)让接收线程一直阻塞,直到消息到达,这样好吗?还是应该让它有个超时(这算是个例外),然后继续循环?

如果你需要更多信息,请告诉我。

现在我基本上有一个接收线程和一个主线程(主线程也负责发送),你可以在下面看到。下面唯一没有提到的是我现在可以调用“terminate”来结束MessageReceiver。


原始内容

我正在尝试在Python中围绕一些接收逻辑创建一个线程。基本上,我们有一个应用程序,它会在后台运行一个线程来轮询消息,我遇到的问题是,实际拉取消息的那部分会无限期等待消息到来。这让它无法终止……我最终把拉取消息的部分放在另一个线程里,但我想确认这样做是否是最好的方法。

原始代码:

class Manager:
   def __init__(self):
        receiver = MessageReceiver()
        receiver.start()
        #do other stuff sending/etc...

class MessageReceiver(Thread):
   receiver = Receiver()

   def __init__(self):
        Thread.__init__(self)


   def run(self):           
       #stop is a flag that i use to stop the thread...
       while(not stopped ):
          #can never stop because pull below blocks
          message  = receiver.pull()
          print "Message" + message   

我知道明显存在锁定问题,但这样控制一个永远等待消息的接收线程合适吗?

我注意到的一件事是,这个线程在等待消息时会占用100%的CPU……

如果你需要看到停止逻辑,请告诉我,我会发上来。

3 个回答

0

1) 如果你的第三个线程只是用来等待接收器和发送器,那就没必要创建它。

处理多个进程比处理多个线程要麻烦一些。进程的一个大优点是它们可以避免CPython在多线程方面的限制,也就是说,两个线程不能同时进行处理(不过一个线程可以在其他线程等待输入输出时运行)。所以,除非这两个线程都需要进行大量处理,否则把它们放在同一个进程里会更好。

2) 你应该让接收器有个超时设置,并在循环中检查是否有终止的标志。

0

哪个函数最终会等待消息的到来呢?很可能有一种方法可以设置超时,这样就可以定期检查一个结束标志。

0

因为这个问题从来没有一个真正的答案,所以我来总结一下:

  1. 有一个主线程负责发送消息,并且会创建一个接收线程。
  2. 这个接收线程会一直阻塞,不会超时,因为抛出异常会很耗费资源,所以没必要让它超时。

为了停止这个线程,我基本上是设置一个停止标志,然后在父线程中发送一个“终止”的消息来关闭底层连接。这样会导致子线程抛出一个异常。

在处理异常的地方,我会检查这个连接异常,如果“停止”标志被设置了,我就会优雅地停止接收线程。如果是非正常退出,我会通知父线程。

希望这对你有帮助,这个问题真是让人烦,但我很高兴它终于解决了。

class Manager:
   def __init__(self):
      self.receiver = MessageReceiver(shutdown_hook = self.shutdown_hook)
      self.receiver.start()
      #do other stuff sending/etc...
   def terminate(self):
      self.receiver.stop()
      self.receiver.close()

   def shutdown_hook(self, t_id, child):
       print '%s - Unexpected thread shutdown, handle this.. restart thread?' % str(t_id))

class MessageReceiver(Thread):


   def __init__(self,shutdown_hook = None):
     Thread.__init__(self)
     self.receiver = Receiver()
     self.shutdown_hook = shutdown_hook 


   def run(self):           
     #stop is a flag that i use to stop the thread...
     while(not stopped ):
      try:
        message  = self.receiver.pull()
        print "Message" + message   
      except ConnectionException as e:
         if stopped:
            #we are in the process of stopping
            pass
         else:
            self.shutdown_hook(self.iden, e)
            break
      finally:
            receiver.close()

撰写回答