通过Python的SpamAssassin不总是工作吗?

2024-05-16 02:42:47 发布

您现在位置:Python中文网/ 问答频道 /正文

我通过smtpd设置了一个由Python脚本管理的邮件服务器,由于收到了一些垃圾邮件,我决定将SpamAssassin连接到它。在

因为我找不到一个连接到SpamAssassin的Python代码来获得分数,所以我自己用在网上找到的一些代码构建了它。代码如下:

# -*- config:utf-8 -*-

import socket, select, re, logging
from io import BytesIO


divider_pattern = re.compile(br'^(.*?)\r?\n(.*?)\r?\n\r?\n', re.DOTALL)
first_line_pattern = re.compile(br'^SPAMD/[^ ]+ 0 EX_OK$')


# @see https://github.com/slimta/python-slimta/blob/master/slimta/policy/spamassassin.py
class SpamAssassin(object):
    def __init__(self, message, timeout=15):
        self.score = None
        self.symbols = None

        # Connecting
        client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        client.settimeout(timeout)
        client.connect(('127.0.0.1', 783))

        # Sending
        client.sendall(self._build_message(message))
        client.shutdown(socket.SHUT_WR)

        # Reading
        resfp = BytesIO()
        while True:
            ready = select.select([client], [], [], timeout)
            if ready[0] is None:
                # Kill with Timeout!
                logging.info('[SpamAssassin] - Timeout ({0}s)!'.format(str(timeout)))
                break

            data = client.recv(4096)
            if data == b'':
                break

            resfp.write(data)

        # Closing
        client.close()
        client = None

        self._parse_response(resfp.getvalue())

    def _build_message(self, message):
        reqfp = BytesIO()
        data_len = str(len(message)).encode()
        reqfp.write(b'SYMBOLS SPAMC/1.2\r\n')
        reqfp.write(b'Content-Length: ' + data_len + b'\r\n')
        reqfp.write(b'User: cx42\r\n\r\n')
        reqfp.write(message)
        return reqfp.getvalue()

    def _parse_response(self, response):
        if response == b'':
            logging.info("[SPAM ASSASSIN] Empty response")
            return None

        match = divider_pattern.match(response)
        if not match:
            logging.error("[SPAM ASSASSIN] Response error:")
            logging.error(response)
            return None

        first_line = match.group(1)
        headers = match.group(2)
        body = response[match.end(0):]

        # Checking response is good
        match = first_line_pattern.match(first_line)
        if not match:
            logging.error("[SPAM ASSASSIN] invalid response:")
            logging.error(first_line)
            return None

        self.symbols = [s.decode('ascii').strip() for s in body.strip().split(',')]

        headers = headers.replace(' ', '').replace(':', ';').replace('/', ';').split(';')
        self.score = float(headers[2])

    def get_score(self):
        return self.score

    def get_symbols(self):
        return self.symbols

    def is_spam(self, level=5):
        return self.score is None or self.score >= level

在我的服务器脚本中,我有以下部分要检查垃圾邮件:

^{pr2}$

当代码不符合if条件时,将发送邮件。在

我面临的最大问题是有些电子邮件被发送,尽管它们被认为是来自SpamAssassin的垃圾邮件。在

我知道这一点是因为我构建了第二个脚本,从postfix加载队列(对于JSON格式,通过postqueue -j),并从下面的代码中执行SpamAssassin检查。相当多的电子邮件被检测为垃圾邮件。 (为了不显示太多代码,加载后缀队列并清理它的是here)。在

我不知道这里显示的代码有什么问题,我的Python代码怎么可能允许发送spam。在

我检查了日志,代码中没有任何异常(比如SpamAssassin的超时或其他任何异常)。在

对我来说,问题是条件if assassin.is_spam(),返回False,而在某些情况下它应该返回True,但我不知道如何/为什么/何时,所以我向您寻求帮助。在

我的理论是:

  • 可能套接字正在重新使用某个缓存版本,该版本在新的电子邮件中为SpamAssassin返回False,但未通过正确的检查
  • 插座有点奇怪/垃圾邮件.py文件,因为这是唯一不起作用的地方。在
  • 可能是并发问题?因为在服务器上有许多请求,所以可能会为多个传入请求打开一个套接字,然后为所有传入请求读取第一个结果,接受不应该的邮件?在

Tags: 代码selfclientnonemessagereturnifresponse