主要想法是: 開啟檔案, 把指標移到檔案最後, 然後有資料則輸出資料, 無資料則休眠一段時間.
import time import sys from typing import Callable, NoReturn class Tail(object): def __init__( self, file_name: str, output: Callable[[str], NoReturn] = sys.stdout.write, interval: int = 1 ): self.file_name: str = file_name self.output: Callable[[str], NoReturn] = output self.interval: int = interval def __call__(self): with open(self.file_name) as f: f.seek(0, 2) # 从文件结尾处开始seek while True: line: str = f.readline() if line: self.output(line) # 使用print都会每次都打印新的一行 else: time.sleep(self.interval) if __name__ == '__main__': filename: str = sys.argv[0] Tail(filename)()
之後只要做如下呼叫即可:
python xxx.py filename
tail -f
預設先讀取最後10行資料,再從檔案尾部讀取即時資料.如果對於小檔案,可以先讀取所有檔案內容,並輸出最後10行,但是讀取全文再獲取最後10行的性能不高, 而從後滾10行的邊界條件也很複雜, 先看先讀取全文再獲取最後10行的實現:
import time import sys from typing import Callable, NoReturn class Tail(object): def __init__( self, file_name: str, output: Callable[[str], NoReturn] = sys.stdout.write, interval: int = 1 ): self.file_name: str = file_name self.output: Callable[[str], NoReturn] = output self.interval: int = interval def __call__(self): with open(self.file_name) as f: self.read_last_line(f) while True: line: str = f.readline() if line: self.output(line) # 使用print都会每次都打印新的一行 else: time.sleep(self.interval) def read_last_line(self, f): last_lines = f.readlines()[-10:] for line in last_lines: self.output(line) if __name__ == '__main__': filename: str = sys.argv[0] Tail(filename)()
可以看到實現很簡單, 相比第一版只多了個read_last_line的函數
, 接下來就要解決性能的問題了, 當文件很大的時候, 這個邏輯是不行的, 特別是有些日誌檔常有幾個G大, 如果全讀出來內存就爆了. 而在Linux系統中, 沒有一個接口可以指定指針跳到倒數10行, 只能使用如下方法來模擬輸出倒數10行:
首先遊標跳到最新的字元, 儲存目前遊標, 然後預估一行資料的字元長度, 最好偏多, 這裡我按1024字元長度為一行來處理
然後利用seek的方法,跳到seek(-1024 * 10, 2)的字元, 這就是我們預估的倒數10行內的內容
#接著對內容進行判斷, 如果跳轉的字元長度小於10 * 1024, 則證明整個檔案沒有10行, 則採用原來的read_last_line
方法.
如果跳到字元長度等於1024 * 10, 則利用換行符計算已取字元長度共有多少行,如果行數大於10,那隻輸出最後10行,如果只讀了4行,則繼續讀6*1024,直到讀滿10行為止
透過以上步奏, 就把倒數10行的資料計算好了可以列印出來, 可以進入追加資料了, 但是這時候文件內容可能改變了, 我們的遊標也發生改變了, 這時候要把遊標跳回到剛才保存的遊標,防止漏打或者重複打印數據.
分析完畢後, 就可以開始重構read_last_line
函數了.
import time import sys from typing import Callable, List, NoReturn class Tail(object): def __init__( self, file_name: str, output: Callable[[str], NoReturn] = sys.stdout.write, interval: int = 1, len_line: int = 1024 ): self.file_name: str = file_name self.output: Callable[[str], NoReturn] = output self.interval: int = interval self.len_line: int = len_line def __call__(self, n: int = 10): with open(self.file_name) as f: self.read_last_line(f, n) while True: line: str = f.readline() if line: self.output(line) # 使用print都会每次都打印新的一行 else: time.sleep(self.interval) def read_last_line(self, file, n): read_len: int = self.len_line * n # 跳转游标到最后 file.seek(0, 2) # 获取当前结尾的游标位置 now_tell: int = file.tell() while True: if read_len > file.tell(): # 如果跳转的字符长度大于原来文件长度,那就把所有文件内容打印出来 file.seek(0) # 由于read方法是按照游标进行打印, 所以要重置游标 last_line_list: List[str] = file.read().split('\n')[-n:] # 重新获取游标位置 now_tell: int = file.tell() break # 跳转到我们预估的字符位置 file.seek(-read_len, 2) read_str: str = file.read(read_len) cnt: int = read_str.count('\n') if cnt >= n: # 如果获取的行数大于要求的行数,则获取前n行的行数 last_line_list: List[str] = read_str.split('\n')[-n:] break else: # 如果获取的行数小于要求的行数,则预估需要获取的行数,继续获取 if cnt == 0: line_per: int = read_len else: line_per: int = int(read_len / cnt) read_len = line_per * n for line in last_line_list: self.output(line + '\n') # 重置游标,确保接下来打印的数据不重复 file.seek(now_tell) if __name__ == '__main__': import argparse parser = argparse.ArgumentParser() parser.add_argument("-f", "--filename") parser.add_argument("-n", "--num", default=10) args, unknown = parser.parse_known_args() if not args.filename: raise RuntimeError('filename args error') Tail(args.filename)(int(args.num))
可以發現即時讀取那區塊的邏輯效能還是很差, 如果每秒讀一次文件,實時性就太慢了,把間隔改小了,則處理器佔用太多. 性能最好的情況是如果能得知文件更新再進行打印文件, 那性能就能得到保障了.慶幸的是,在Linux中inotify
提供了這樣的功能. 此外,日誌文件有一個特點就是會進行logrotate,如果日誌被logrotate了,那我們就需要重新打開文件,並進一步讀取資料, 這種情況也可以利用到inotify
, 當inotify
取得到檔案重新開啟的事件時,我們就重新開啟檔案,再讀取.
import os import sys from typing import Callable, List, NoReturn import pyinotify multi_event = pyinotify.IN_MODIFY | pyinotify.IN_MOVE_SELF # 监控多个事件 class InotifyEventHandler(pyinotify.ProcessEvent): # 定制化事件处理类,注意继承 """ 执行inotify event的封装 """ f: 'open()' filename: str path: str wm: 'pyinotify.WatchManager' output: Callable def my_init(self, **kargs): """pyinotify.ProcessEvent要求不能直接继承__init__, 而是要重写my_init, 我们重写这一段并进行初始化""" # 获取文件 filename: str = kargs.pop('filename') if not os.path.exists(filename): raise RuntimeError('Not Found filename') if '/' not in filename: filename = os.getcwd() + '/' + filename index = filename.rfind('/') if index == len(filename) - 1 or index == -1: raise RuntimeError('Not a legal path') self.f = None self.filename = filename self.output: Callable = kargs.pop('output') self.wm = kargs.pop('wm') # 只监控路径,这样就能知道文件是否移动 self.path = filename[:index] self.wm.add_watch(self.path, multi_event) def read_line(self): """统一的输出方法""" for line in self.f.readlines(): self.output(line) def process_IN_MODIFY(self, event): """必须为process_事件名称,event表示事件对象, 这里表示监控到文件发生变化, 进行文件读取""" if event.pathname == self.filename: self.read_line() def process_IN_MOVE_SELF(self, event): """必须为process_事件名称,event表示事件对象, 这里表示监控到文件发生重新打开, 进行文件读取""" if event.pathname == self.filename: # 检测到文件被移动重新打开文件 self.f.close() self.f = open(self.filename) self.read_line() def __enter__(self) -> 'InotifyEventHandler': self.f = open(self.filename) return self def __exit__(self, exc_type, exc_val, exc_tb): self.f.close() class Tail(object): def __init__( self, file_name: str, output: Callable[[str], NoReturn] = sys.stdout.write, interval: int = 1, len_line: int = 1024 ): self.file_name: str = file_name self.output: Callable[[str], NoReturn] = output self.interval: int = interval self.len_line: int = len_line wm = pyinotify.WatchManager() # 创建WatchManager对象 inotify_event_handler = InotifyEventHandler( **dict(filename=file_name, wm=wm, output=output) ) # 实例化我们定制化后的事件处理类, 采用**dict传参数 wm.add_watch('/tmp', multi_event) # 添加监控的目录,及事件 self.notifier = pyinotify.Notifier(wm, inotify_event_handler) # 在notifier实例化时传入,notifier会自动执行 self.inotify_event_handle: 'InotifyEventHandler' = inotify_event_handler def __call__(self, n: int = 10): """通过inotify的with管理打开文件""" with self.inotify_event_handle as i: # 先读取指定的行数 self.read_last_line(i.f, n) # 启用inotify的监听 self.notifier.loop() def read_last_line(self, file, n): read_len: int = self.len_line * n # 获取当前结尾的游标位置 file.seek(0, 2) now_tell: int = file.tell() while True: if read_len > file.tell(): # 如果跳转的字符长度大于原来文件长度,那就把所有文件内容打印出来 file.seek(0) last_line_list: List[str] = file.read().split('\n')[-n:] # 重新获取游标位置 now_tell: int = file.tell() break file.seek(-read_len, 2) read_str: str = file.read(read_len) cnt: int = read_str.count('\n') if cnt >= n: # 如果获取的行数大于要求的行数,则获取前n行的行数 last_line_list: List[str] = read_str.split('\n')[-n:] break else: # 如果获取的行数小于要求的行数,则预估需要获取的行数,继续获取 if cnt == 0: line_per: int = read_len else: line_per: int = int(read_len / cnt) read_len = line_per * n for line in last_line_list: self.output(line + '\n') # 重置游标,确保接下来打印的数据不重复 file.seek(now_tell) if __name__ == '__main__': import argparse parser = argparse.ArgumentParser() parser.add_argument("-f", "--filename") parser.add_argument("-n", "--num", default=10) args, unknown = parser.parse_known_args() if not args.filename: raise RuntimeError('filename args error') Tail(args.filename)(int(args.num))
可以看到, 從原本的open打開文件改為用inotify打開文件(這時候會調用my_init方法進行初始化), 打開後還是運行我們打開原來n行的代碼, 然後就交給inotify運行. 在inotify運行之前, 我們把重新打開文件方法和打印文件方法都掛載在inotifiy對應的事件裡, 之後inotify運行時, 會根據對應的事件執行對應的方法。
以上是如何用Python完成tail指令的詳細內容。更多資訊請關注PHP中文網其他相關文章!