使用Python将日志旋转到目录中

4 投票
3 回答
8616 浏览
提问于 2025-04-16 12:59

我有一个叫做 Poller.log 的文件,它会不断地添加日志信息。我希望这个日志文件每天都能进行轮换,并且最多保留30天的记录。这样的话,代码就能正常工作。

现在我想把这些已经轮换的日志放到一个文件夹里(比如 logs/poller.log.2011-03-04_15-36)。有没有办法指定这个轮换后的文件应该放在哪里呢?

这个 Python 脚本会通过 Cron 来执行。

import logging
import logging.handlers

LOG_FILENAME = '/home/stackoverflow/snmpdata/poller.log'

# Set up a specific logger with our desired output level
poll_logger = logging.getLogger('pollerLog')

# Add the log message handler to the logger
log_rotator = logging.handlers.TimedRotatingFileHandler(LOG_FILENAME, when='d', interval=1, backupCount=30, encoding=None, delay=False, utc=False)
poll_logger.addHandler(log_rotator)

# Roll over on application start
poll_logger.handlers[0].doRollover()

3 个回答

0

我添加了一段代码,用于将任何日志备份移动到一个文件夹里。

import logging
import logging.handlers
import shutil, os, glob
import zipfile
import schedule
import time
import threading

zip_file_name = "Log.zip"
zip_file_path = "Logs/LogsArchive/Log.zip"

source_directory = "Logs"
archive_directory = "Logs/LogsArchive"


def moveAllFilesinDir(srcDir, dstDir, allLogs = False):
    try:
    # Check if both the are directories
        if os.path.isdir(srcDir) and os.path.isdir(dstDir):
            # Iterate over all the files in source directory

            if allLogs == False:
                for filePath in glob.glob(srcDir + '/*.*.*'):
                    # Move each file to destination Directory
                    shutil.move(filePath, dstDir)
            elif allLogs == True:
                for filePath in glob.glob(srcDir + '/*.*'):
                    # Move each file to destination Directory
                    shutil.copy(filePath, dstDir)

        else:
            debug_logger.debug("LoggingModule: - moveAllFilesinDir - srcDir & dstDir should be Directories")
    except Exception as ex:
        error_logger.error("Error in LoggingModule - moveAllFilesinDir", exc_info=True)


只有扩展名为三部分的日志文件会被移动,格式是“名字.log.日期”。我现在正在处理一个将归档文件夹压缩的过程。

更新:这是压缩的过程。

def createZipDir(path):
    #delete old zipfile if exists, but leave old zipfile if no other files exist
    if len(os.listdir(path)) > 1:
        zipFile = zip_file_path
        if os.path.isfile(zipFile):
            os.remove(zipFile)
        zipf = zipfile.ZipFile(zip_file_path, 'w', zipfile.ZIP_DEFLATED)
        for root, dirs, files in os.walk(path):
            for file in files:
                if file != zip_file_name:
                    zipf.write(os.path.join(root, file))
        zipf.close()
    else:
        debug_logger.debug("LoggingModule: - createZipDir - no files found, zip file left in place.")

删除旧文件:

def deleteOldFilesinDir(srcDir):
    try:
    # Check if both the are directories
        if os.path.isdir(srcDir):
            # Iterate over all the files in source directory
            for filePath in glob.glob(srcDir + '/*.*'):
                if filePath != zip_file_path:
                    os.remove(filePath)
        else:
            print("srcDir & dstDir should be Directories")
    except Exception as ex:
        error_logger.error("Error in LoggingModule - deleteOldFilesinDir", exc_info=True)

这是整个过程:

我把运行归档过程的任务设置为每周运行一次。


def runArchiveProcess(allFiles = False):
    debug_logger.debug("LoggingModule: Archive process started.")
    moveAllFilesinDir(source_directory, archive_directory, allFiles)
    createZipDir(archive_directory)
    deleteOldFilesinDir(archive_directory)
    debug_logger.debug("LoggingModule Archive process completed.")

还有调度的部分:

#only kicked off in own thread...
def runScheduler():
    debug_logger.debug("LoggingModule - runScheduler - don't call this function outside of LoggingModule as it runs in own thread.")
    schedule.every().monday.at("00:00:00").do(runArchiveProcess)
    #schedule.every(10).seconds.do(runArchiveProcess).do(runArchiveProcess) #for testing

    try:
        while True:
            debug_logger.debug("LoggingModule checking scheduler...")
            #Checks whether a scheduled task is pending to run or not
            schedule.run_pending()
            debug_logger.debug("LoggingModule Scheduler sleeping...")
            time.sleep(60 * 60) # checks every 1 hour
            #time.sleep(10)  # for testing
    except Exception as ex:
        error_logger.error("Error in LoggingModule - runScheduler", exc_info=True)


def runSchedulerThread():
    thread = threading.Thread(target=runScheduler)
    thread.start()

2

如果你不介意多加一个依赖的话,可以使用Twisted里的轮换日志模块。Twisted有一个日志文件模块,可以让你每天、每周,甚至每月都生成日志,正好适合这种情况。

5

Python的日志处理器不太容易做到这一点。你可以有两种方法来实现这个目标:

  1. 最简单的方法是把LOG_FILENAME设置为logs/poller.log,这样如果你想在其他地方访问poller.log,只需要使用一个符号链接就可以了 :)

  2. 你可以创建自己的处理器,从TimedRotatingFileHandler开始,然后从/usr/lib/python2.X/logging/handlers.py中的TimedRotatingFileHandler类里复制粘贴doRollover()的代码。接着修改:

dfn = self.baseFilename + "." + time.strftime(self.suffix, timeTuple)

dfn = os.path.join('logs', os.path.basename(self.baseFilename)) + "." + time.strftime(self.suffix, timeTuple)

撰写回答