使用Python将日志轮换到目录中

问题描述 投票:0回答:4

我有一个名为 Poller.log 的文件,它始终附加日志详细信息。我希望这个日志文件每天轮换一次,并且限制为 30 天。因此,代码运行良好。

现在我希望已旋转的日志位于一个文件夹中(即logs/poller.log.2011-03-04_15-36)。无论如何,有没有办法指导应该在哪里创建这个旋转文件?

这个Python脚本将由Cron执行。

import logging
import logging.handlers

LOG_FILENAME = '/home/stackoverflow/snmpdata/poller.log'

# Set up a specific logger with our desired output level
poll_logger = logging.getLogger('pollerLog')

# Add the log message handler to the logger
log_rotator = logging.handlers.TimedRotatingFileHandler(LOG_FILENAME, when='d', interval=1, backupCount=30, encoding=None, delay=False, utc=False)
poll_logger.addHandler(log_rotator)

# Roll over on application start
poll_logger.handlers[0].doRollover()
python logging rotation
4个回答
5
投票

Python 日志处理程序不允许轻易做到这一点。您可能有两种方法可以实现这一目标:

  1. 最简单的方法是将 LOG_FILENAME 设置为已经在 messages/poller.log 中,如果您想在其他任何地方访问 poller.log,请使用符号链接:)

  2. 从 TimedRotatingFileHandler 开始创建您自己的处理程序,并从 /usr/lib/python2.X/logging/handlers.py TimedRotatingFileHandler 类复制/粘贴 doRollover() 。并改变:

dfn = self.baseFilename + "." + time.strftime(self.suffix, timeTuple)

dfn = os.path.join('logs', os.path.basename(self.baseFilename)) + "." + time.strftime(self.suffix, timeTuple)

2
投票

如果您不介意额外的依赖性,您可以随时使用twisted 中的翻转日志记录模块。 Twisted 有一个日志文件模块,允许每日日志、每周日志,甚至像这种情况的每月日志。


0
投票

我为单独的进程添加了这段代码,以将任何日志备份移动到文件夹。

import logging
import logging.handlers
import shutil, os, glob
import zipfile
import schedule
import time
import threading

zip_file_name = "Log.zip"
zip_file_path = "Logs/LogsArchive/Log.zip"

source_directory = "Logs"
archive_directory = "Logs/LogsArchive"


def moveAllFilesinDir(srcDir, dstDir, allLogs = False):
    try:
    # Check if both the are directories
        if os.path.isdir(srcDir) and os.path.isdir(dstDir):
            # Iterate over all the files in source directory

            if allLogs == False:
                for filePath in glob.glob(srcDir + '/*.*.*'):
                    # Move each file to destination Directory
                    shutil.move(filePath, dstDir)
            elif allLogs == True:
                for filePath in glob.glob(srcDir + '/*.*'):
                    # Move each file to destination Directory
                    shutil.copy(filePath, dstDir)

        else:
            debug_logger.debug("LoggingModule: - moveAllFilesinDir - srcDir & dstDir should be Directories")
    except Exception as ex:
        error_logger.error("Error in LoggingModule - moveAllFilesinDir", exc_info=True)


只有扩展名为 3 部分的日志文件才会被移动到“name.log.date”上 我现在正在研究压缩存档文件夹的过程。

更新: 这是Zip过程

def createZipDir(path):
    #delete old zipfile if exists, but leave old zipfile if no other files exist
    if len(os.listdir(path)) > 1:
        zipFile = zip_file_path
        if os.path.isfile(zipFile):
            os.remove(zipFile)
        zipf = zipfile.ZipFile(zip_file_path, 'w', zipfile.ZIP_DEFLATED)
        for root, dirs, files in os.walk(path):
            for file in files:
                if file != zip_file_name:
                    zipf.write(os.path.join(root, file))
        zipf.close()
    else:
        debug_logger.debug("LoggingModule: - createZipDir - no files found, zip file left in place.")

删除旧文件:

def deleteOldFilesinDir(srcDir):
    try:
    # Check if both the are directories
        if os.path.isdir(srcDir):
            # Iterate over all the files in source directory
            for filePath in glob.glob(srcDir + '/*.*'):
                if filePath != zip_file_path:
                    os.remove(filePath)
        else:
            print("srcDir & dstDir should be Directories")
    except Exception as ex:
        error_logger.error("Error in LoggingModule - deleteOldFilesinDir", exc_info=True)

整个过程是这样的:

我将 runArchiveProcess 设置为每周运行一次。


def runArchiveProcess(allFiles = False):
    debug_logger.debug("LoggingModule: Archive process started.")
    moveAllFilesinDir(source_directory, archive_directory, allFiles)
    createZipDir(archive_directory)
    deleteOldFilesinDir(archive_directory)
    debug_logger.debug("LoggingModule Archive process completed.")

调度程序位:

#only kicked off in own thread...
def runScheduler():
    debug_logger.debug("LoggingModule - runScheduler - don't call this function outside of LoggingModule as it runs in own thread.")
    schedule.every().monday.at("00:00:00").do(runArchiveProcess)
    #schedule.every(10).seconds.do(runArchiveProcess).do(runArchiveProcess) #for testing

    try:
        while True:
            debug_logger.debug("LoggingModule checking scheduler...")
            #Checks whether a scheduled task is pending to run or not
            schedule.run_pending()
            debug_logger.debug("LoggingModule Scheduler sleeping...")
            time.sleep(60 * 60) # checks every 1 hour
            #time.sleep(10)  # for testing
    except Exception as ex:
        error_logger.error("Error in LoggingModule - runScheduler", exc_info=True)


def runSchedulerThread():
    thread = threading.Thread(target=runScheduler)
    thread.start()


0
投票

logging模块中的

BaseRotatingHandler
类提供了接口

class BaseRotatingHandler:
    def rotation_filename(default_name):
        if not callable(self.namer):
            result = default_name
        else:
            result = self.namer(default_name)
        return result

因此,您可以像这样创建自己的客户 RotatingHandler:

import datetime
import re

class DayRotatingHandler(RotatingFileHandler):

    @staticmethod
    def get_previous(name):
        basename, log_index = re.match(r"(.*)\.(\d+)$", name)
        date = datetime.datetime.now() - datetime.timedelta(days=int(log_index))
        return f"logs/{basename}.

    namer = get_previous
© www.soinside.com 2019 - 2024. All rights reserved.