如何在类中处理Python的asyncore,而不阻塞任何东西?

15 投票
5 回答
10356 浏览
提问于 2025-04-17 13:25

我需要创建一个类,用来接收和存储SMTP消息,也就是电子邮件。为此,我参考了一个例子,使用了asyncore,你可以在这里找到。不过,asyncore.loop()是阻塞的,这意味着在它运行的时候,我的代码就不能做其他事情了。

所以我想到了使用线程。下面是一个示例代码,展示了我的想法:

class MyServer(smtpd.SMTPServer):
    # derive from the python server class

    def process_message(..):
        # overwrite a smtpd.SMTPServer method to be able to handle the received messages
        ...
        self.list_emails.append(this_email)

    def get_number_received_emails(self):
        """Return the current number of stored emails"""
        return len(self.list_emails)


    def start_receiving(self):
        """Start the actual server to listen on port 25"""

        self.thread =   threading.Thread(target=asyncore.loop)
        self.thread.start()     

    def stop(self):
        """Stop listening now to port 25"""
        # close the SMTPserver from itself
        self.close()
        self.thread.join()

我希望你能明白我的意思。这个类MyServer应该能够以非阻塞的方式开始和停止监听25号端口,并且在监听的时候(或者不监听的时候)能够查询消息。start方法会启动asyncore.loop()监听器,当接收到电子邮件时,它会把邮件添加到一个内部列表中。类似地,stop方法应该能够停止这个服务器,具体可以参考这里的建议。

尽管这段代码没有按我预期的那样工作(asyncore似乎会一直运行,即使我调用了上面的stop方法。抛出的errorstop中被捕获,但在包含asyncore.loop()target函数中却没有),我不确定我的解决方案是否合理。任何修复上述代码的建议,或者提出更稳健的实现方案(使用第三方软件),都非常欢迎。

5 个回答

3

你可以考虑使用Twisted这个工具。这个链接展示了如何搭建一个SMTP服务器,并且可以自定义在邮件送达时的处理方式。

4

来自另一个问题 asyncore.loop 在没有更多连接时不会结束

我觉得你对线程的理解有点过于复杂了。使用另一个问题中的代码,你可以通过下面这段代码来启动一个新的线程,让它运行 asyncore.loop

import threading

loop_thread = threading.Thread(target=asyncore.loop, name="Asyncore Loop")
# If you want to make the thread a daemon
# loop_thread.daemon = True
loop_thread.start()

这样做会在一个新线程中运行这个代码,并且会一直运行,直到所有的 asyncore 通道都关闭为止。

17

这个解决方案可能不是最复杂的,但它效果还不错,并且经过测试。

首先,关于 asyncore.loop() 的问题是,它会一直阻塞,直到所有的 asyncore 通道都关闭,正如用户 Wessie 在之前的评论中提到的。提到的 smtp 示例 中,smtpd.SMTPServer 是从 asyncore.dispatcher 继承而来的(具体可以查看 smtpd 文档),这就回答了哪个通道需要关闭的问题。

因此,原来的问题可以用以下更新的示例代码来回答:

class CustomSMTPServer(smtpd.SMTPServer):
    # store the emails in any form inside the custom SMTP server
    emails = []
    # overwrite the method that is used to process the received 
    # emails, putting them into self.emails for example
    def process_message(self, peer, mailfrom, rcpttos, data):
        # email processing


class MyReceiver(object):
    def start(self):
        """Start the listening service"""
        # here I create an instance of the SMTP server, derived from  asyncore.dispatcher
        self.smtp = CustomSMTPServer(('0.0.0.0', 25), None)
        # and here I also start the asyncore loop, listening for SMTP connection, within a thread
        # timeout parameter is important, otherwise code will block 30 seconds after the smtp channel has been closed
        self.thread =  threading.Thread(target=asyncore.loop,kwargs = {'timeout':1} )
        self.thread.start()     

    def stop(self):
        """Stop listening now to port 25"""
        # close the SMTPserver to ensure no channels connect to asyncore
        self.smtp.close()
        # now it is save to wait for the thread to finish, i.e. for asyncore.loop() to exit
        self.thread.join()

    # now it finally it is possible to use an instance of this class to check for emails or whatever in a non-blocking way
    def count(self):
        """Return the number of emails received"""
        return len(self.smtp.emails)        
    def get(self):
        """Return all emails received so far"""
        return self.smtp.emails
    ....

最后,我有一个 start 方法和一个 stop 方法,用于在非阻塞的环境中开始和停止监听 25 端口。

撰写回答