观察不断增长的文件尾部是否出现某些关键字的 Python 方式是什么?
在 shell 中我可能会说:
tail -f "$file" | grep "$string" | while read hit; do
#stuff
done
嗯,最简单的方法是不断读取文件,检查新内容并测试点击率。
import time
def watch(fn, words):
fp = open(fn, 'r')
while True:
new = fp.readline()
# Once all lines are read this just returns ''
# until the file changes and a new line appears
if new:
for word in words:
if word in new:
yield (word, new)
else:
time.sleep(0.5)
fn = 'test.py'
words = ['word']
for hit_word, hit_sentence in watch(fn, words):
print "Found %r in line: %r" % (hit_word, hit_sentence)
如果您知道数据将按行显示,则此解决方案与
readline
有效。
如果数据是某种流,您需要一个缓冲区,大于您正在寻找的最大
word
,并首先填充它。这样事情就变得有点复杂了...
def tail(f):
f.seek(0, 2)
while True:
line = f.readline()
if not line:
time.sleep(0.1)
continue
yield line
def process_matches(matchtext):
while True:
line = (yield)
if matchtext in line:
do_something_useful() # email alert, etc.
list_of_matches = ['ERROR', 'CRITICAL']
matches = [process_matches(string_match) for string_match in list_of_matches]
for m in matches: # prime matches
m.next()
while True:
auditlog = tail( open(log_file_to_monitor) )
for line in auditlog:
for m in matches:
m.send(line)
我用它来监视日志文件。 在完整的实现中,我将 list_of_matches 保留在配置文件中,以便它可以用于多种目的。 我的增强功能列表中包括对正则表达式的支持,而不是简单的“in”匹配。
编辑:正如下面的评论所述,
O_NONBLOCK
不适用于磁盘上的文件。 如果其他人来寻找来自套接字或命名管道或其他进程的尾部数据,这仍然会有所帮助,但它没有回答所提出的实际问题。 原始答案保留在下面,以供后代使用。 (调用 tail 和 grep 会起作用,但无论如何都不是答案。)
使用
O_NONBLOCK
打开文件并使用 select
轮询读取可用性,然后使用 read
读取新数据和字符串方法来过滤文件末尾的行...或者仅使用 subprocess
模块,让 tail
和 grep
为您完成工作,就像在 shell 中一样。
您可以使用 select 轮询文件中的新内容。
def tail(filename, bufsize = 1024):
fds = [ os.open(filename, os.O_RDONLY) ]
while True:
reads, _, _ = select.select(fds, [], [])
if 0 < len(reads):
yield os.read(reads[0], bufsize)
如果您无法将问题限制为基于行的读取,则需要诉诸块。
这应该有效:
import sys
needle = "needle"
blocks = []
inf = sys.stdin
if len(sys.argv) == 2:
inf = open(sys.argv[1])
while True:
block = inf.read()
blocks.append(block)
if len(blocks) >= 2:
data = "".join((blocks[-2], blocks[-1]))
else:
data = blocks[-1]
# attention, this needs to be changed if you are interested
# in *all* matches separately, not if there was any match ata all
if needle in data:
print "found"
blocks = []
blocks[:-2] = []
if block == "":
break
挑战在于确保针匹配,即使它被两个块边界分开。
你可以使用 pytailf :简单的 python tail -f 包装器
from tailf import tailf
for line in tailf("myfile.log"):
print line
据我所知,Python 函数列表中没有相当于“tail”的东西。解决方案是使用tell()(获取文件大小)和read()来计算结束行。
这篇博文(不是我写的)已经写出了函数,看起来很适合我! http://www.manugarg.com/2007/04/real-tailing-in-python.html
以下代码:
inotify
ctypes
库import os
import ctypes
import ctypes.util
# Constants for inotify
IN_MODIFY = 0x00000002
# Load libc
libc = ctypes.CDLL(ctypes.util.find_library('c'))
# Define inotify functions from libc
inotify_init = libc.inotify_init
inotify_add_watch = libc.inotify_add_watch
inotify_read = libc.read
# Define the tail_file function
def tail_file(filepath):
# Initialize inotify
inotify_fd = inotify_init()
# Add a watch for file modification events
wd = inotify_add_watch(inotify_fd, filepath.encode(), IN_MODIFY)
# Open the file and move to the end
with open(filepath, 'r') as file:
file.seek(0, os.SEEK_END)
while True:
# Create a buffer to store inotify events
buffer = os.read(inotify_fd, 1024) # This will block until the file is modified
# Read and print any new lines from the file
line = file.readline()
if line:
print(line, end='')
tail_file('your_file.txt')
如果您只需要一个非常简单的 Python 3 解决方案来处理编写的文本文件的行,并且不需要 Windows 支持,那么这对我来说效果很好:
import subprocess
def tailf(filename):
#returns lines from a file, starting from the beginning
command = "tail -n +1 -F " + filename
p = subprocess.Popen(command.split(), stdout=subprocess.PIPE, universal_newlines=True)
for line in p.stdout:
yield line
for line in tailf("logfile"):
#do stuff
它会阻塞等待写入新行,因此如果不进行一些修改,这不适合异步使用。
您可以使用
collections.deque
来实现tail。
来自 http://docs.python.org/library/collections.html#deque-recipes ...
def tail(filename, n=10):
'Return the last n lines of a file'
return deque(open(filename), n)
当然,这会读取整个文件内容,但这是实现 tail 的一种简洁的方式。