自动更新CSV文件

1 投票
4 回答
4860 浏览
提问于 2025-04-17 04:20

我最近开始学习Python(大约5个小时之前)。这是我的情况。

我每4个小时会收到来自一个远程测量站的邮件,里面有测量值。这些文件是*.csv格式,文件名像是XX-2011-00001.csvYY-2011-00001.csv。这些是两个仪器的数据,它们在不同的采样间隔下持续运行。文件保存在本地文件夹里。

我想写一个脚本,能够读取一个文件(比如说XX-2011-00001.csv),然后把相同的数据写入一个新的csv文件。4个小时后,这个脚本应该再次运行,这次只读取新的文件XX-2011-00002.csv,并把这些数据添加到之前创建的新csv文件中。我希望这个脚本能无限循环运行,这样它就能不断检查新文件,并把它们添加到csv文件里。

这些文件包含“日期”、“时间”和“值”这几个字段。

你能告诉我应该查看哪些模块来写这个脚本吗?如果你有任何示例,我会非常感激。

4 个回答

0

有一个叫做 csv 的模块可以帮助你处理相关的事情。而你可能还想了解一下 time.sleep(),虽然有更好的方法来解决这个问题(不过考虑到你刚开始学习这门语言,使用 time.sleep() 作为起步是个不错的选择)。

1

正如其他人所说,csv这个包里有很棒的工具,可以帮助我们处理文件的读写,而不需要写很多复杂的底层代码。

不过,如果有条件的话,我建议用cron任务来实现定时需求,而不是让你的程序一直“睡觉”。这样做会更灵活,而且如果你的程序意外崩溃了,不会因为你没在看它而导致任务停止。

1

csv模块可以帮助你读取和写入文件。你需要使用一个无限循环,并加上暂停,类似这样:

while True:
    process_new_file()     # does nothing if no new file
    time.sleep(60)

process_new_file这个函数需要检查是否有新文件,这个过程可能有点棘手——因为你不想在文件还没写完的时候就去使用它!可以用这样的方式来实现:

def check_for_new_file(directory=INCOMING, files={}):
    for file in os.listdir(directory):
        if file in files:
            break
        size = os.stat(file)[stat.ST_SIZE]
        files[file] = (datetime.time.now(), size)
    now = datetime.time.now()
    for file, last_time, last_size in files.items():
        current_size = os.stat(file)[stat.ST_SIZE]
        if current_size != last_size:
            files[file] = (now, current_size)
            continue
        if now - last_time <= TIME_WITH_NO_WRITES:
            return file
    raise NoneReady()

现在我们有了一个函数,它可以监控INCOMING目录里的文件,并在文件静止了一段时间后返回文件名,这样我们就可以比较确定文件已经写完了。接下来,我们需要一个函数来实际处理这个文件,然后把它移动到一个安全的地方保存。

def process_new_file():
    try:
        filename = check_for_new_file()   # raises ValueError if no file ready
    except NoneReady:
        return
    in_file = open(filename, 'rb')
    csv_file_in = csv.reader(in_file)
    out_file = open(MASTER_CSV, 'rb+')
    csv_file_out = csv.writer(out_file)
    for row in csv_file_in:
        csv_file_out.write(row)
    csv_file_out.close()
    csv_file_in.close()
    shutil.move(filename, PROCESSED)

把这些内容整合在一起,别忘了导入需要的库和全局变量:

import os
import stat
import shutil

INCOMING = '/some/path/with/new/files/'
PROCESSED = '/some/path/for/processed/files/'
TIME_WITH_NO_WRITES = 600  # 10 minutes

def check_for_new_file(directory=INCOMING, files={}):
    for file in os.listdir(directory):
        if file in files:
            break
        size = os.stat(file)[stat.ST_SIZE]
        files[file] = (datetime.time.now(), size)
    now = datetime.time.now()
    for file, last_time, last_size in files.items():
        current_size = os.stat(file)[stat.ST_SIZE]
        if current_size != last_size:
            files[file] = (now, current_size)
            continue
        if now - last_time <= TIME_WITH_NO_WRITES:
            return file
    raise NoneReady()

def process_new_file():
    try:
        filename = check_for_new_file()   # raises ValueError if no file ready
    except NoneReady:
        return
    in_file = open(filename, 'rb')
    csv_file_in = csv.reader(in_file)
    out_file = open(MASTER_CSV, 'rb+')
    csv_file_out = csv.writer(out_file)
    for row in csv_file_in:
        csv_file_out.write(row)
    csv_file_out.close()
    csv_file_in.close()
    shutil.move(filename, PROCESSED)

if __name__ == '__main__':
    while True:
        process_new_file()     # does nothing if no new file
        time.sleep(60)

这段代码目前还没有经过测试,所以可能会有一两个小错误,如果出现错误,程序会停止运行。希望这些内容能帮助你入门。

撰写回答