我有一段代码,其作用是将数据保存到文本文件中。我还定义了一个函数来清除文件。不幸的是,清除文件后,不再保存任何数据。我正在保存 iperf 的输出。
def start_client_and_print_results(self):
'''Start iperf client and print results to console'''
my_iperf_process = subprocess.Popen([self.initalize_iperf, "-c", self.ip_addr, self.force_flush],stdout=subprocess.PIPE)
for line in my_iperf_process.stdout:
print(line)
self.starting_index_position += 1
if (self.starting_index_position > self.start_line_to_write_into_file and self.starting_index_position < self.end_line):
self.save_results_to_file_txt(str(line) + "\n")
self.save_results_to_file_csv(str(line) + "\n")
用于保存数据的函数:
def save_results_to_file_txt(self, output):
'''Save results to a text file'''
client_results = open("client_results.txt", "a")
client_results.write(output)
client_results.close()
清除数据:
def clear_file(self):
clear = open("client_results.txt", "w")
clear.close()
上述功能与 tkinter 库一起使用,我为其定义了具有适当功能的按钮。
button_iperf = tk.Button(root, text = "Start iperf client", command = iperfing.start_client_and_print_results)
button_clear = tk.Button(root, text = "Clear", command = iperfing.clear_file)
有什么想法吗?
我尝试在文件定义中使用 with 语句,但没有成功。
函数在类中定义。
class Iperf():
'''Class to handle iperf client functionality'''
def __init__(self, initalize_iperf: str, force_flush: str, ip_addr: str, starting_index_position: int, start_line_to_write_into_file: int, end_line: int):
'''Initialize the iperf client instance'''
self.initalize_iperf = initalize_iperf
self.force_flush = force_flush
self.ip_addr = ip_addr
self.starting_index_position = starting_index_position
self.start_line_to_write_into_file = start_line_to_write_into_file
self.end_line = end_line
完整的代码如下所示:
class Iperf():
'''Class to handle iperf client functionality'''
def __init__(self, initalize_iperf: str, force_flush: str, ip_addr: str, starting_index_position: int, start_line_to_write_into_file: int, end_line: int):
'''Initialize the iperf client instance'''
self.initalize_iperf = initalize_iperf
self.force_flush = force_flush
self.ip_addr = ip_addr
self.starting_index_position = starting_index_position
self.start_line_to_write_into_file = start_line_to_write_into_file
self.end_line = end_line
def start_client_and_print_results(self):
'''Start iperf client and print results to console'''
my_iperf_process = subprocess.Popen([self.initalize_iperf, "-c", self.ip_addr, self.force_flush],stdout=subprocess.PIPE)
for line in my_iperf_process.stdout:
print(line)
self.starting_index_position += 1
if (self.starting_index_position > self.start_line_to_write_into_file and self.starting_index_position < self.end_line):
self.save_results_to_file_txt(str(line) + "\n")
self.save_results_to_file_csv(str(line) + "\n")
my_iperf_process.kill()
def start_server(self):
'''Start iperf server'''
my_iperf_process = subprocess.Popen(["iperf3","-s","-B", self.ip_addr, self.force_flush],stdout=subprocess.PIPE)
def save_results_to_file_txt(self, output):
'''Save results to a text file'''
client_results = open("client_results.txt", "a")
client_results.write(output)
client_results.close()
def clear_file(self):
clear = open("client_results.txt", "w")
clear.close()
我在课堂上使用这些参数。
iperf_path = "iperf3"
force_flush = "--forceflush"
ip_addr = "" # Example IP address
starting_index_position = 0
start_line = 3
end_line = 14
好吧,我设法找到了一个如下所示的解决方法。我不通过 Python 处理文件保存,而是直接从终端进行:
class Iperf:
'''Class to handle iperf client functionality'''
def __init__(self, initialize_iperf: str, force_flush: str, ip_addr: str, starting_index_position: int, start_line_to_write_into_file: int, end_line: int):
'''Initialize the iperf client instance'''
self.initialize_iperf = initialize_iperf
self.force_flush = force_flush
self.ip_addr = ip_addr
self.starting_index_position = starting_index_position
self.start_line_to_write_into_file = start_line_to_write_into_file
self.end_line = end_line
self.my_iperf_process = None # Hold reference to the subprocess
def start_client_and_print_results(self):
'''Start iperf client and print results to console'''
subprocess.run("iperf3 -c ip-address --forceflush > client_results.txt", shell=True)
def start_server(self):
'''Start iperf server'''
my_iperf_process = subprocess.Popen(["iperf3", "-s", "-B", self.ip_addr, self.force_flush], stdout=subprocess.PIPE)
def clear_file(self):
'''Delete file'''
os.remove("client_results.txt")
# Example usage:
iperf_instance = Iperf("iperf3", "--forceflush", "ip-address", 0, 5, 20)
iperf_instance.start_client_and_print_results()
iperf_instance.clear_file()
iperf_instance.start_client_and_print_results()
所以即使删除文件后,我也可以再次覆盖它。感谢大家抽出时间!也许我们也会找到另一种解决方案