Python 删除符合未知模式的旧文件(棘手)

2 投票
7 回答
1283 浏览
提问于 2025-04-17 14:55

我的服务器空间快满了,我需要自动删除一些文件。文件一般是每天都会添加到服务器上,但有时候会暂停,可能是每两周或每个月才来一次。有时候几个月都没有新文件,然后又会开始,这种情况很不稳定。

我的脚本需要删除那些超过30天的文件,但一定要保留每种文件模式中最新的5个文件。这部分比较复杂。

文件的唯一可预测的特点是,文件名中总会有一个格式为yyyymmddhhmmss的时间戳,其他的文件名部分则不太固定。如果文件名里没有时间戳,我就不想删除它。

举个例子,目录里可能会有这样的文件:

20121118011335_team1-pathway_Truck_Report_Data_10342532.zip
20121119011335_team1-pathway_Truck_Report_Data_102345234.zip
20121120011335_team1-pathway_Truck_Report_Data_10642224.zip
20121121011335_team1-pathway_Truck_Report_Data_133464.zip
20121122011335_team1-pathway_Truck_Report_Data_126434344.zip
20121123011335_team1-pathway_Truck_Report_Data_12444656.zip
20121124011335_team1-pathway_Truck_Report_Data_1624444.zip
20121125011335_team1-pathway_Truck_Report_Data_3464433.zip
randomefilewithnodate.zip
20121119011335_team2-Paper_Size_Report_336655.zip
20121120011335_team2-Paper_Size_Report_336677.zip
20121121011335_team2-Paper_Size_Report_338877.zip
20121122011335_team2-Paper_Size_Report_226688.zip
20121123011335_team2-Paper_Size_Report_776688.zip
20121124011335_team2-Paper_Size_Report_223355.zip
20121125011335_team2-Paper_Size_Report_111111.zip

在这种情况下,我的脚本应该只删除第一个模式中最旧的3个文件:

20121118011335_team1-pathway_Truck_Report_Data_10342532.zip
20121119011335_team1-pathway_Truck_Report_Data_102345234.zip
20121120011335_team1-pathway_Truck_Report_Data_10642224.zip

以及第二个模式中最旧的2个文件:
20121119011335_team2-Paper_Size_Report_336655.zip
20121120011335_team2-Paper_Size_Report_336677.zip

这样就能保留最新的5个文件,并且不会动到没有日期的文件

我的问题是,我不知道yyyymmddhhmmss_后面会跟什么。我只知道它会是各种不同的形式,比如yyyymmddhhmmss_something_consistent_random_random或者yyyymmddhhmmss_something_consistent_something_consistent_random_random.xyz。

到目前为止,我已经写出了一个正则表达式来匹配时间戳是否存在,但我想不出怎么让我的脚本聪明到能够识别文件的其他模式,并保留5个最新的文件。

欢迎任何想法!下面的脚本还不完美,我可以修正小错误。

我主要需要帮助的是保留最新5个文件的部分

额外的问题是关于时间戳的部分。

def myCleansingMethod(self, client)

    # Get rid of things older than 30 days
    # 30 days has this many seconds 30 * 24 * 60 * 60
    numberOfSeconds = 2592000
    # establish what the epoc time of the oldest file I want to keep is
    oldestFileThatIWantToKeep = time.time() - numberOfSeconds
    #establish my working directory
    workingDirectory = "/home/files/%s" % (client)
    try:
        files = os.listdir(workingDirectory)
        except:
        print "Could not find directory"
        return

        files.sort()
        for file in files:
            # define Full File Name (path + file)
            fullFileName = "%s/%s" % (workingDirectory, file)
            # make sure the file contains yyyymmddhhmmss
            match = re.search(r'[0-9]{4}(1[0-2]|0[1-9])(3[01]|[12][0-9]|0[1-9])([01]\d|2[0123])([0-5]\d){2}', file)
            if match:
                #get what was matched in the RegEx
                fileTime = match.group()
                #convert fileTime to Epoc time
                fileTimeToEpoc = (fileTime + NOT SURE HOW TO DO THIS PART YET)

                if fileTimeToEpoc < oldestFileThatIWantToKeep AND (CODE THAT MAKES SURE   THERE ARE AT LEAST 5 FILES OF THE SAME PATTERN PRESENT) :
                print "Delete file: %s\t%s" % (fileTimeToEpoc, fullFileName)
                command = "rm -Rf %s" % fullFileName
                print command
                os.system (command)
                else:
                pass  
            else:
            pass

7 个回答

1

这些文件唯一可以预测的特点就是,它们总是包含一个格式为yyyymmddhhmmss的时间戳,并且有一些重复的模式。

为了让文件名中可以随意出现yyyymmddhhmmss,并且能够自动找到重复的模式,你可以先把文件名中的yyyymmddhhmmss去掉,然后找出至少重复两次的最长前缀作为重复模式。

import os
from itertools import groupby
from os.path import commonprefix

def files_to_delete(topdir):
    for rootdir, dirs, files in os.walk(topdir):
        # find files with yyyymmddhhmmss
        files_with_date = []
        for filename in files:
            for m in re.finditer(r"(?:^|\D)(\d{14})(?:\D|$)", filename):
                date = parse_date(m.group(1))
                if date is not None: # found date in the filename
                   # strip date
                   no_date = filename[:m.start(1)] + filename[m.end(1):]
                   # add to candidates for removal
                   files_with_date.append((no_date, date, filename))
                   break

        # find repeating pattern
        files_with_date.sort() # sort by filename with a removed date
        # given ["team1-a", "team2-b", "team2-c"]
        # yield [["team1-a"], ["team2-b", "team2-c"]] where 
        #    roots are "team1" and "team2"
        # reject [["team1-a", "team2-b", "team2-c"]] grouping (root "team")
        #     because the longer root "team2" occurs more than once
        roots = [commonprefix(a[0],b[0]) for a,b in pairwise(files_with_date)]
        roots.sort(key=len, reverse=True) # longest roots first
        def longest_root(item):
            no_date = item[0]
            return next(r for r in roots if no_date.startswith(r)) or no_date
        for common_root, group in groupby(files_with_date, key=longest_root):
            # strip 5 newest items (sort by date)
            for _, d, filename in sorted(group, key=lambda item: item[1])[:-5]:
                if d < month_ago: # older than 30 days
                   yield os.path.join(rootdir, filename)

注意:['team1-a', 'team2-b', 'team3-c', ...]这些文件名是通过'team'这个重复模式组合在一起的,变成了[['team1-a', 'team2-b', 'team3-c', ...]]。也就是说,如果“重复模式”在文件列表中没有重复,那么上面的算法就会失败。

工具:

from datetime import datetime, timedelta
from itertools import izip, tee

month_ago = datetime.utcnow() - timedelta(days=30)

def parse_date(yyyymmddhhmmss):
    try: return datetime.strptime(yyyymmddhhmmss, "%Y%m%d%H%M%S")
    except ValueError:
         return None

def pairwise(iterable): # itertools recipe
    a, b = tee(iterable)
    next(b, None)
    return izip(a, b)       

如果你想删除一个文件,可以用os.remove(path)来代替os.system()

如果将来你能改变文件的命名规则,让它们更有规律,比如在文件名中用[]包围住模式,那么你就可以提取出根名,如下所示:

root = re.match(r'[^[]*\[([^]]+)\]', filename).group(1)
1

你需要做的难点不是编程问题,而是定义问题,所以光靠写更好的代码是解决不了的 :-)

为什么 20121125011335_team1-pathway_Truck_Report_Data_3464433.zip20121118011335_team1-pathway_Truck_Report_Data_10342532.zip 还有 20121119011335_team1-pathway_Truck_Report_Data_102345234.zip 会被归为同一组呢?你是怎么意识到它们共同的重要部分是 _team1-pathway_Truck_Report_Data_ 而不是 _team1-pathway_Truck_Report_Data_1 的呢?

回答这个问题(我猜答案会涉及到“下划线”和/或“数字”这几个词),你就能找到解决的方向。

我只知道它会是类似于 yyyymmddhhmmss_something_consistent_random_random 或者 yyyymmddhhmmss_something_consistent_something_consistent_random_random.xyz 的各种变体。

如果这就是所有可能的变化,那我建议你寻找被下划线包围的共同开头部分。这是可行的,因为随机的内容总是在最后,所以如果你想把文件扩展名当作重要内容,那你需要特别处理它(例如,把它移到你比较的字符串前面)。如果你发现几个文件有三个“词”是相同的,但没有四个,那么你就可以假设第四个部分是“随机的”,而前三个部分是“一致的”。然后你把这类文件按日期排序,取出最新的五个,删除其余超过30天的文件。

找到这些共同开头部分的“显而易见”的方法是按文件名的组成部分(不包括日期)进行字典顺序排序。这样,具有共同开头部分的文件就会相邻,你可以逐个比较每个文件与当前最长的共同前缀文件组。

在编码时,要确保如果可能出现以下情况,你能正确处理:

<some_date>_truck1_548372.zip
<some_date>_truck1_847284.zip
<some_date>_truck1_data_4948739.zip
<some_date>_truck1_data_9487203.zip

也就是说,要确保你知道在这种情况下你是处理一个组(“truck1”),还是两个组(“truck1”和“truck1_data”)。这很重要,因为你可能想要排除任何 truck1_data 文件,而不需要保留5个 truck1 文件。


另一种方法:

  • 找到所有超过30天的文件(例如 <some_date>_truck1_57349.zip),并按从旧到新的顺序排序
  • 对于每个文件,寻求“删除”的“许可”,步骤如下:
    • 去掉文件名开头的日期
    • 搜索所有文件(不仅仅是超过30天的),忽略它们自己的日期,找出与这个文件有共同的下划线包围的开头子串(这样我们就找到了 truck1 文件和 truck1_data 文件)
    • 找到这些文件后,找出至少两个文件共享的最长子串(truck1_data
    • 如果目标文件不共享那个子串,就从集合中删除所有有共同子串的文件,然后重复上一步(现在我们只剩下 truck1 文件)
    • 一旦目标文件共享了那个子串,就统计一下。如果有至少5个,就删除目标文件。

如上所述,这种方法不必要地慢,但我认为它简单地说明了问题。在最后一步,你实际上可以删除剩下的文件中除了5个以外的所有文件,并将这5个文件从未来的考虑中移除,因为你已经识别出一组文件。同样,当你删除所有共享的子串比目标文件更长的文件时,你也识别出了一组文件,这样你就可以将其作为一个整体处理,而不是再把它扔回去等待未来的识别。

3

这个任务挺有意思的,我大量使用了函数式编程的模式,主要是用到了 itertools 这个库。我喜欢使用迭代器,因为它们可以处理很大的列表,而且函数式编程的思想让代码更易读、更好维护。

首先,我们需要从 itertools 和 datetime 导入一些东西:

from itertools import groupby, chain
from datetime import datetime

接下来,把你的样本文件名放到一个列表里:

filenames = """20121118011335_team1-pathway_Truck_Report_Data_10342532.zip
20121119011335_team1-pathway_Truck_Report_Data_102345234.zip
20121120011335_team1-pathway_Truck_Report_Data_10642224.zip
20121121011335_team1-pathway_Truck_Report_Data_133464.zip
20121122011335_team1-pathway_Truck_Report_Data_126434344.zip
20121123011335_team1-pathway_Truck_Report_Data_12444656.zip
20121124011335_team1-pathway_Truck_Report_Data_1624444.zip
20121125011335_team1-pathway_Truck_Report_Data_3464433.zip
randomefilewithnodate.zip
20121119011335_team2-Paper_Size_Report_336655.zip
20121120011335_team2-Paper_Size_Report_336677.zip
20121121011335_team2-Paper_Size_Report_338877.zip
20121122011335_team2-Paper_Size_Report_226688.zip
20121123011335_team2-Paper_Size_Report_776688.zip
20121124011335_team2-Paper_Size_Report_223355.zip
20121125011335_team2-Paper_Size_Report_111111.zip""".split("\n")

这里有一些辅助函数。函数名应该很容易理解。

def extract_date(s):
    return datetime.strptime(s.split("_")[0], "%Y%m%d%H%M%S")

def starts_with_date(s):
    try:
        extract_date(s)
        return True
    except Exception:
        return False

下一个方法你可能需要调整一下,如果它没有覆盖所有情况的话——对于你的样本数据来说,它是可以的。

def get_name_root(s):
    return "".join(s.split(".")[0].split("_")[1:-1])

def find_files_to_delete_for_group(group):
    sorted_group = sorted(group, key=extract_date)
    return sorted_group[:-5]        

现在,整个过程可以通过一些迭代来完成。首先,我会过滤文件名列表,把那些不以特定日期开头的文件名过滤掉。然后,把剩下的文件名按它们的“名称根”分组(我想不出更好的名字了)。

fn_groups = groupby(
                filter(
                    starts_with_date,
                    filenames),
                get_name_root
            )

接下来,对于每一组,我会应用一个过滤方法(见上文),找出那些不包含最新五个日期的文件名。每组找到的结果会被 chain 起来,也就是说,从多个列表中创建一个迭代器:

fns_to_delete = chain(*[find_files_to_delete_for_group(g) for k, g in fn_groups])

最后,为了方便检查结果,我把这个迭代器转换成一个列表并打印出来:

print list(fns_to_delete)

这个脚本的输出结果是:

['20121118011335_team1-pathway_Truck_Report_Data_10342532.zip', '20121119011335_team1-pathway_Truck_Report_Data_102345234.zip', '20121120011335_team1-pathway_Truck_Report_Data_10642224.zip', '20121119011335_team2-Paper_Size_Report_336655.zip', '20121120011335_team2-Paper_Size_Report_336677.zip']

如果有什么不明白的地方,随时问我。

这里是完整的脚本,方便你直接复制粘贴:

from itertools import groupby, chain
from datetime import datetime

filenames = """20121118011335_team1-pathway_Truck_Report_Data_10342532.zip
20121119011335_team1-pathway_Truck_Report_Data_102345234.zip
20121120011335_team1-pathway_Truck_Report_Data_10642224.zip
20121121011335_team1-pathway_Truck_Report_Data_133464.zip
20121122011335_team1-pathway_Truck_Report_Data_126434344.zip
20121123011335_team1-pathway_Truck_Report_Data_12444656.zip
20121124011335_team1-pathway_Truck_Report_Data_1624444.zip
20121125011335_team1-pathway_Truck_Report_Data_3464433.zip
randomefilewithnodate.zip
20121119011335_team2-Paper_Size_Report_336655.zip
20121120011335_team2-Paper_Size_Report_336677.zip
20121121011335_team2-Paper_Size_Report_338877.zip
20121122011335_team2-Paper_Size_Report_226688.zip
20121123011335_team2-Paper_Size_Report_776688.zip
20121124011335_team2-Paper_Size_Report_223355.zip
20121125011335_team2-Paper_Size_Report_111111.zip""".split("\n")

def extract_date(s):
    return datetime.strptime(s.split("_")[0], "%Y%m%d%H%M%S")

def starts_with_date(s):
    try:
        extract_date(s)
        return True
    except Exception:
        return False

def get_name_root(s):
    return "".join(s.split(".")[0].split("_")[1:-1])

def find_files_to_delete_for_group(group):
    sorted_group = sorted(group, key=extract_date)
    return sorted_group[:-5]        

fn_groups = groupby(
                filter(
                    starts_with_date,
                    filenames),
                get_name_root
            )

fns_to_delete = chain(*[find_files_to_delete_for_group(g) for k, g in fn_groups])

print list(fns_to_delete)

撰写回答