在Python中监控文件是否停止写入

问题描述 投票:0回答:5

我有一个程序每秒不断写入文件。文件写入是在与 UI 并行的线程中进行的。由于某些硬件问题,它有时会停止写入。我想检查文件是否停止写入,如果没有更新则重新启动程序。我想检查文件的时间戳,看看它是否没有更新(并且不想访问看门狗等。因为我只需要文件是否停止写入。)

try:
    if time.time()>(os.stat(filename).st_mtime+2):
        raise ValueError("Yikes! Spike")
except ValueError:
    with open('errors.log','a') as log:
        log.write('Spike occured at '+ time.strftime(
        "%H:%M:%S")+' on '+datetime.date.today().strftime('%d/%m/%Y')+'\n')
        log.close()
    restart_program()

该块每秒运行一次。但这适得其反,当应用程序关闭以重新启动时,它每秒都会关闭并且不会再次启动。我每秒都会收到记录的异常消息。我尝试增加时差,但这没有帮助。

接下来我尝试了

ftimestamp = os.stat(filename).st_mtime
try:
    if os.stat(filename).st_mtime>=ftimestamp:
        ftimestamp = time.time()
        print "ftimestamp updated and all is well"
    else:
        ftimestamp = os.stat(filename).st_mtime
        raise ValueError("Yikes! Spike!")
        print "file time is behind"
except ValueError:
    with open('errors.log','a') as log:
        log.write('Spike occured at '+ time.strftime(
        "%H:%M:%S")+' on '+datetime.date.today().strftime('%d/%m/%Y')+'\n')
        log.close()
    restart_program()

我尝试将变量“ftimestamp”更新为当前时间“time.time()”,因为下一次比较仅在一秒后发生,并且我希望文件时间高于上一次比较的时间。 (该块通过 wx.CallLater 函数每秒运行一次)。

我的程序仍然失败...而且我不明白我哪里出了问题...请有人帮忙!或者有没有一种方法可以简单地检查文件是否停止写入?

python file timestamp
5个回答
2
投票

我们可以尝试通过执行以下操作来检查文件大小的变化作为可能的解决方案:

import os
from time import sleep
# other imports

while True:
    file1 = os.stat('file.txt') # initial file size
    file1_size = file1.st_size
 
    # your script here that collects and writes data (increase file size)
    sleep(1)
    file2 = os.stat('file.txt') # updated file size
    file2_size = file2.st_size
    comp = file2_size - file1_size # compares sizes
    if comp == 0:
        restart_program()
    else:
        sleep(5)

您可能需要相应地调整

sleep()
函数,这些只是我正在使用的估计值,因为我无法测试您的实际代码。最后,这是一个无限循环,只要您希望脚本继续写入,它就会继续运行。

另一个解决方案是将您的代码更新为:

import os
import sys
from time import sleep
# other imports

while True:
    file1 = os.stat('file.txt') # initial file size
    file1_size = file1.st_size
 
    # your script here that collects and writes data (increase file size)
    sleep(1)
    file2 = os.stat('file.txt') # updated file size
    file2_size = file2.st_size
    comp = file2_size - file1_size # compares sizes
    if comp == 0:
        sys.exit
    else:
        sleep(5)

然后使用辅助程序来运行脚本,如下所示:

import os
from time import sleep, strftime

while True:
    print(strftime("%H:%M:%S"), "Starting"))
    system('main.py') # this is another infinite loop that will keep your script running
    print(strftime("%H:%M:%S"), "Crashed"))
    sleep(5)

1
投票

要确定文件是否在 GUI 程序中按时更改,您可以使用事件循环的标准工具,每隔几秒运行一个函数,例如,以下是如何在

tkinter
中执行此操作:

#!/usr/bin/env python3
import logging
import os
import sys
import tkinter
from datetime import datetime
from time import monotonic as timer, localtime

path = sys.argv[1]
interval = 120 # check every 2 minutes

def check(last=[None]):
    mtime = os.path.getmtime(path) # or os.path.getsize(path)
    logging.debug("mtime %s", datetime.fromtimestamp(mtime))
    if last[0] == mtime: #NOTE: it is always False on the first run
        logging.error("file metadata hasn't been updated, exiting..")
        root.destroy() # exit GUI
    else: # schedule the next run
        last[0] = mtime
        root.after(round(1000 * (interval - timer() % interval)), check)


logging.basicConfig(level=logging.DEBUG,
                    filename=os.path.splitext(__file__)[0] + ".log",
                    format="%(asctime)-15s %(message)s", datefmt="%F %T")
root = tkinter.Tk()
root.withdraw() # hide GUI
root.after(round(1000 * (interval - timer() % interval)), check) # start on boundary
root.mainloop()

您可以使用supervisord、systemd、upstart等来自动重生您的脚本。

请参阅如何在 python 中定期运行函数


1
投票

最后,在修改了基于时间戳的选项之后,以下内容似乎对我有用。

try:
    if time.time()-os.stat(filename).st_mtime>6:
        touch(filename)
        raise ValueError("Yikes! Spike")
except ValueError:
    with open('errors.log','a') as log:
        log.write('Spike/App restarting occured at '+ time.strftime(
                "%H:%M:%S")+' on '+datetime.date.today().strftime('%d/%m/%Y')+'\n')
        log.close()
    restart_program()

之前的问题是它会检测到文件在给定的时间间隔内停止写入并继续满足相同的要求。

time.time()-os.stat(filename).st_mtime>6 

但是一旦满足这个条件,除非文件时间戳被更新,否则它会继续满足这个条件并会不断重新启动程序。现在在我的解决方案中,我“触摸”了一次文件(从这里使用触摸)条件得到满足,现在它按预期工作。

感谢大家的投入。


0
投票

获取文件大小并等待文件保存的更好版本:

import os

def get_file_size(file_path, wait_until_file_saved=False):

    file = os.stat(file_path)
    file_size = file.st_size

    if wait_until_file_saved:
        sleep_time = 0.5
        time.sleep(sleep_time)

        while True:
            file = os.stat(file_path)
            curr_file_size = file.st_size

            diff = curr_file_size - file_size  # compares sizes
            if diff == 0:
                break
            else:
                time.sleep(sleep_time)
            file_size = curr_file_size

    # print("file_size : ", file_size)
    return file_size

0
投票

检查和获取文件大小并等待文件保存的更好版本:

import os

def get_file_size(file_path, wait_until_file_saved=False):

    file = os.stat(file_path)
    file_size = file.st_size

    if wait_until_file_saved:
        sleep_time = 0.5
        time.sleep(sleep_time)

        while True:
            file = os.stat(file_path)
            curr_file_size = file.st_size

            diff = curr_file_size - file_size  # compares sizes
            if diff == 0:
                break
            else:
                time.sleep(sleep_time)
            file_size = curr_file_size

    # print("file_size : ", file_size)
    return file_size
© www.soinside.com 2019 - 2024. All rights reserved.