Python 日志记录到数据库

60 投票
5 回答
65912 浏览
提问于 2025-04-15 19:36

我想找到一种方法,让Python的日志记录模块可以把日志记录到数据库里。如果数据库出现问题,就自动切换到文件系统来记录日志。

简单来说,有两个要点:第一,怎么让日志记录器把日志记录到数据库;第二,怎么在数据库出问题时,让它转而记录到文件里。

5 个回答

11

用Python把日志记录到数据库,并设置备份记录器


问题

我在服务器上运行Django项目时遇到了同样的问题,有时候需要远程查看日志。


解决方案

首先,需要为记录器设置一个处理器,用来把日志插入到数据库里。在这之前,由于我的SQL水平不太好,所以我选择了一个ORM工具,也就是SQLAlchemy

模型:

# models.py
from sqlalchemy import Column, Integer, String, DateTime, Text
from sqlalchemy.ext.declarative import declarative_base
import datetime

base = declarative_base()


class Log(base):
    __tablename__ = "log"
    id = Column(Integer, primary_key=True, autoincrement=True)
    time = Column(DateTime, nullable=False, default=datetime.datetime.now)
    level_name = Column(String(10), nullable=True)
    module = Column(String(200), nullable=True)
    thread_name = Column(String(200), nullable=True)
    file_name = Column(String(200), nullable=True)
    func_name = Column(String(200), nullable=True)
    line_no = Column(Integer, nullable=True)
    process_name = Column(String(200), nullable=True)
    message = Column(Text)
    last_line = Column(Text)

这是插入数据库的基本操作:

#crud.py
import sqlalchemy
from .models import base
from traceback import print_exc


class Crud:
    def __init__(self, connection_string=f'sqlite:///log_db.sqlite3',
                 encoding='utf-8',
                 pool_size=10,
                 max_overflow=20,
                 pool_recycle=3600):

        self.connection_string = connection_string
        self.encoding = encoding
        self.pool_size = pool_size
        self.max_overflow = max_overflow
        self.pool_recycle = pool_recycle
        self.engine = None
        self.session = None

    def initiate(self):
        self.create_engine()
        self.create_session()
        self.create_tables()

    def create_engine(self):
        self.engine = sqlalchemy.create_engine(self.connection_string)

    def create_session(self):
        self.session = sqlalchemy.orm.Session(bind=self.engine)

    def create_tables(self):
        base.metadata.create_all(self.engine)

    def insert(self, instances):
        try:
            self.session.add(instances)
            self.session.commit()
            self.session.flush()
        except:
            self.session.rollback()
            raise

    def __del__(self):
        self.close_session()
        self.close_all_connections()

    def close_session(self):
        try:
            self.session.close()
        except:
            print_exc()
        else:
            self.session = None

    def close_all_connections(self):
        try:
            self.engine.dispose()
        except:
            print_exc()
        else:
            self.engine = None

处理器:

# handler.py
from logging import Handler, getLogger
from traceback import print_exc
from .crud import Crud
from .models import Log


my_crud = Crud(
    connection_string=<connection string to reach your db>,
    encoding='utf-8',
    pool_size=10,
    max_overflow=20,
    pool_recycle=3600)

my_crud.initiate()


class DBHandler(Handler):
    backup_logger = None

    def __init__(self, level=0, backup_logger_name=None):
        super().__init__(level)
        if backup_logger_name:
            self.backup_logger = getLogger(backup_logger_name)

    def emit(self, record):
        try:
            message = self.format(record)
            try:
                last_line = message.rsplit('\n', 1)[-1]
            except:
                last_line = None

            try:
                new_log = Log(module=record.module,
                              thread_name=record.threadName,
                              file_name=record.filename,
                              func_name=record.funcName,
                              level_name=record.levelname,
                              line_no=record.lineno,
                              process_name=record.processName,
                              message=message,
                              last_line=last_line)
                # raise

                my_crud.insert(instances=new_log)
            except:
                if self.backup_logger:
                    try:
                        getattr(self.backup_logger, record.levelname.lower())(record.message)
                    except:
                        print_exc()
                else:
                    print_exc()

        except:
            print_exc()

测试记录器:

# test.py
from logging import basicConfig, getLogger, DEBUG, FileHandler, Formatter
from .handlers import DBHandler

basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
            datefmt='%d-%b-%y %H:%M:%S',
            level=DEBUG)
format = Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')

backup_logger = getLogger('backup_logger')
file_handler = FileHandler('file.log')
file_handler.setLevel(DEBUG)
file_handler.setFormatter(format)
backup_logger.addHandler(file_handler)

db_logger = getLogger('logger')
db_handler = DBHandler(backup_logger_name='backup_logger')
db_handler.setLevel(DEBUG)
db_handler.setFormatter(format)
db_logger.addHandler(db_handler)

if __name__ == "__main__":
    db_logger.debug('debug: hello world!')
    db_logger.info('info: hello world!')
    db_logger.warning('warning: hello world!')
    db_logger.error('error: hello world!')
    db_logger.critical('critical: hello world!!!!')

你可以看到,这个处理器接受一个备份记录器,当数据库插入失败时可以使用它。

一个不错的改进是可以通过线程来实现数据库日志记录。

41

我最近用Python写了一个自己的数据库记录器。因为找不到相关的例子,所以我想把我的分享出来。这个记录器可以和MS SQL数据库一起使用。

数据库表的结构可能是这样的:

CREATE TABLE [db_name].[log](
    [id] [bigint] IDENTITY(1,1) NOT NULL,
    [log_level] [int] NULL,
    [log_levelname] [char](32) NULL,
    [log] [char](2048) NOT NULL,
    [created_at] [datetime2](7) NOT NULL,
    [created_by] [char](32) NOT NULL,
) ON [PRIMARY]

下面是这个类的代码:

class LogDBHandler(logging.Handler):
    '''
    Customized logging handler that puts logs to the database.
    pymssql required
    '''
    def __init__(self, sql_conn, sql_cursor, db_tbl_log):
        logging.Handler.__init__(self)
        self.sql_cursor = sql_cursor
        self.sql_conn = sql_conn
        self.db_tbl_log = db_tbl_log

    def emit(self, record):
        # Set current time
        tm = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(record.created))
        # Clear the log message so it can be put to db via sql (escape quotes)
        self.log_msg = record.msg
        self.log_msg = self.log_msg.strip()
        self.log_msg = self.log_msg.replace('\'', '\'\'')
        # Make the SQL insert
        sql = 'INSERT INTO ' + self.db_tbl_log + ' (log_level, ' + \
            'log_levelname, log, created_at, created_by) ' + \
            'VALUES (' + \
            ''   + str(record.levelno) + ', ' + \
            '\'' + str(record.levelname) + '\', ' + \
            '\'' + str(self.log_msg) + '\', ' + \
            '(convert(datetime2(7), \'' + tm + '\')), ' + \
            '\'' + str(record.name) + '\')'
        try:
            self.sql_cursor.execute(sql)
            self.sql_conn.commit()
        # If error - print it out on screen. Since DB is not working - there's
        # no point making a log about it to the database :)
        except pymssql.Error as e:
            print sql
            print 'CRITICAL DB ERROR! Logging to database not possible!'

还有一个使用示例:

import pymssql
import time
import logging

db_server = 'servername'
db_user = 'db_user'
db_password = 'db_pass'
db_dbname = 'db_name'
db_tbl_log = 'log'

log_file_path = 'C:\\Users\\Yourname\\Desktop\\test_log.txt'
log_error_level     = 'DEBUG'       # LOG error level (file)
log_to_db = True                    # LOG to database?

class LogDBHandler(logging.Handler):
    [...]

# Main settings for the database logging use
if (log_to_db):
    # Make the connection to database for the logger
    log_conn = pymssql.connect(db_server, db_user, db_password, db_dbname, 30)
    log_cursor = log_conn.cursor()
    logdb = LogDBHandler(log_conn, log_cursor, db_tbl_log)

# Set logger
logging.basicConfig(filename=log_file_path)

# Set db handler for root logger
if (log_to_db):
    logging.getLogger('').addHandler(logdb)
# Register MY_LOGGER
log = logging.getLogger('MY_LOGGER')
log.setLevel(log_error_level)

# Example variable
test_var = 'This is test message'

# Log the variable contents as an error
log.error('This error occurred: %s' % test_var)

上面的代码会同时把记录写入数据库和文件。如果不需要写入文件,可以跳过那一行 'logging.basicConfig(filename=log_file_path)'。所有通过 'log' 记录的信息都会以 MY_LOGGER 的名字记录。如果出现外部错误(比如导入的模块出错等),错误信息会显示为 'root',因为 'root' 记录器也在工作,并且使用了数据库处理器。

18

自己写一个处理器,把日志记录到你想要的数据库里。如果这个过程出错了,你可以把它从日志记录器的处理器列表中删掉。处理出错的情况有很多种方法可以解决。

撰写回答