在下面的 Python 3.12 示例中需要更改哪些具体代码,才能使程序
myReader.py
在每次出现“Stop,该死的!”行时成功停止。由程序打印到 sourceFile.txt
myWriter.py
?
问题:
问题是
myReader.py
有时只会在“停止,该死!”这句话时停止。被打印成 sourceFile.txt
。
一个解决方法是让
myWriter.py
继续写“停下来,该死!”一次又一次地sourceFile.txt
。 这可能会导致 myReader.py
最终停止。 但问题是 myWriter.py
必须在任意长的时间内继续写入同一行。 我们测试了持续15分钟。 但在某些情况下,myWriter.py
可能需要继续写“停止,该死!”每秒 30 分钟。 可能还有其他时候,myWriter.py
可能需要继续写“停下来,该死!”每秒只有一两分钟。
问题似乎是
myReader.py
发出的 API 调用需要不同的时间才能返回,因此积压有时会变得任意长,但并非总是如此。 而且 myReader.py
循环似乎无法看到“停止,该死!”除非且直到许多异步 API 调用任务完成。
理想的解决方案是让
myReader.py
实际听到并回应“停止,该死!”的文字。而不是需要“停下来,该死!”写了很多次了。
作家计划:
myWriter.py
程序写了很多东西。 但myWriter.py
写入停止命令的相关部分是:
import time
#Repeat 900 times to test output. Sleep for 1 second between each.
for i in range(900):
writeToFile("Stop, damnit!")
time.sleep(1)
读者计划:
myReader.py
的相关部分如下:
import os
import platform
import asyncio
import aiofiles
BATCH_SIZE = 10
def get_source_file_path():
if platform.system() == 'Windows':
return 'C:\\path\\to\\sourceFile.txt'
else:
return '/path/to/sourceFile.txt'
async def send_to_api(linesBuffer):
success = runAPI(linesBuffer)
return success
async def read_source_file():
source_file_path = get_source_file_path()
counter = 0
print("Reading source file...")
print("source_file_path: ", source_file_path)
#Detect the size of the file located at source_file_path and store it in the variable file_size.
file_size = os.path.getsize(source_file_path)
print("file_size: ", file_size)
taskCountList = []
background_tasks = set()
async with aiofiles.open(source_file_path, 'r') as source_file:
await source_file.seek(0, os.SEEK_END)
linesBuffer = []
while True:
# Always make sure that file_size is the current size:
line = await source_file.readline()
new_file_size = os.path.getsize(source_file_path)
if new_file_size < file_size:
print("The file has been truncated.")
print("old file_size: ", file_size)
print("new_file_size: ", new_file_size)
await source_file.seek(0, os.SEEK_SET)
file_size = new_file_size
# Allocate a new list instead of clearing the current one
linesBuffer = []
counter = 0
continue
line = await source_file.readline()
if line:
new_line = str(counter) + " line: " + line
print(new_line)
linesBuffer.append(new_line)
print("len(linesBuffer): ", len(linesBuffer))
if len(linesBuffer) == BATCH_SIZE:
print("sending to api...")
task = asyncio.create_task(send_to_api(linesBuffer))
background_tasks.add(task)
task.add_done_callback(background_tasks.discard)
pendingTasks = len(background_tasks)
taskCountList.append(pendingTasks)
print("")
print("pendingTasks: ", pendingTasks)
print("")
# Do not clear the buffer; allocate a new one:
linesBuffer = []
counter += 1
print("counter: ", counter)
#detect whether or not the present line is the last line in the file.
# If it is the last line in the file, then write whatever batch
# we have even if it is not complete.
if "Stop, damnit!" in line:
#Print the next line 30 times to simulate a large file.
for i in range(30):
print("LAST LINE IN FILE FOUND.")
#sleep for 1 second to simulate a large file.
await asyncio.sleep(1)
#Omitting other stuff for brevity.
break
else:
await asyncio.sleep(0.1)
async def main():
await read_source_file()
if __name__ == '__main__':
asyncio.run(main())
须知:
line = await source_file.readline()
将很乐意返回一个空字符串。line = await source_file.readline()
,第一次调用的结果被抛出第一个调用将返回一行,第二个调用返回空字符串,因为没有其他内容可读取。因此,您在第一次调用中读取了
"Stop, damnit!"
,然后再次调用 readline
并得到空字符串。
您可以通过修改您的作者来验证这一点:
writeToFile("Stop, damnit!\nStop, damnit!")
(也许是\r\n
)。这样,您可以一次将两行放入文件中,第二次调用 readline
实际上会读取一些内容,并且停止检查实际上会看到该消息。