使用文件输入停止异步程序

问题描述 投票:0回答:1

在下面的 Python 3.12 示例中需要更改哪些具体代码,才能使程序

myReader.py
在每次出现“Stop,该死的!”行时成功停止。由程序打印到
sourceFile.txt
myWriter.py

问题:

问题是

myReader.py
有时只会在“停止,该死!”这句话时停止。被打印成
sourceFile.txt

一个解决方法是让

myWriter.py
继续写“停下来,该死!”一次又一次地
sourceFile.txt
。 这可能会导致
myReader.py
最终停止。 但问题是
myWriter.py
必须在任意长的时间内继续写入同一行。 我们测试了持续15分钟。 但在某些情况下,
myWriter.py
可能需要继续写“停止,该死!”每秒 30 分钟。 可能还有其他时候,
myWriter.py
可能需要继续写“停下来,该死!”每秒只有一两分钟。

问题似乎是

myReader.py
发出的 API 调用需要不同的时间才能返回,因此积压有时会变得任意长,但并非总是如此。 而且
myReader.py
循环似乎无法看到“停止,该死!”除非且直到许多异步 API 调用任务完成。

理想的解决方案是让

myReader.py
实际听到并回应“停止,该死!”的文字。而不是需要“停下来,该死!”写了很多次了。

作家计划:

myWriter.py
程序写了很多东西。 但
myWriter.py
写入停止命令的相关部分是:

import time
#Repeat 900 times to test output. Sleep for 1 second between each.
for i in range(900):
  writeToFile("Stop, damnit!")
  time.sleep(1)

读者计划:

myReader.py
的相关部分如下:

import os
import platform
import asyncio
import aiofiles

BATCH_SIZE = 10

def get_source_file_path():
  if platform.system() == 'Windows':
    return 'C:\\path\\to\\sourceFile.txt'
  else:
    return '/path/to/sourceFile.txt'

async def send_to_api(linesBuffer):
  success = runAPI(linesBuffer)
  return success

async def read_source_file():
  source_file_path = get_source_file_path()
  counter = 0
  print("Reading source file...")
  print("source_file_path: ", source_file_path)
  #Detect the size of the file located at source_file_path and store it in the variable file_size.
  file_size = os.path.getsize(source_file_path)
  print("file_size: ", file_size)
  taskCountList = []

  background_tasks = set()

  async with aiofiles.open(source_file_path, 'r') as source_file:
    await source_file.seek(0, os.SEEK_END)
    linesBuffer = []
    while True:
      # Always make sure that file_size is the current size:
      line = await source_file.readline()
      new_file_size = os.path.getsize(source_file_path)
      if new_file_size < file_size:
        print("The file has been truncated.")
        print("old file_size: ", file_size)
        print("new_file_size: ", new_file_size)
        await source_file.seek(0, os.SEEK_SET)
        file_size = new_file_size
        # Allocate a new list instead of clearing the current one
        linesBuffer = []
        counter = 0
        continue
      line = await source_file.readline()
      if line:
        new_line = str(counter) + " line: " + line
        print(new_line)
        linesBuffer.append(new_line)
        print("len(linesBuffer): ", len(linesBuffer))
        if len(linesBuffer) == BATCH_SIZE:
          print("sending to api...")
          task = asyncio.create_task(send_to_api(linesBuffer))
          background_tasks.add(task)
          task.add_done_callback(background_tasks.discard)
          pendingTasks = len(background_tasks)
          taskCountList.append(pendingTasks)
          print("")
          print("pendingTasks: ", pendingTasks)
          print("")
          # Do not clear the buffer; allocate a new one:
          linesBuffer = []
          counter += 1
          print("counter: ", counter)
        #detect whether or not the present line is the last line in the file.
        # If it is the last line in the file, then write whatever batch
        # we have even if it is not complete.
        if "Stop, damnit!" in line:
          #Print the next line 30 times to simulate a large file.
          for i in range(30):
            print("LAST LINE IN FILE FOUND.")
            #sleep for 1 second to simulate a large file.
            await asyncio.sleep(1)
          #Omitting other stuff for brevity.
          break
      else:
          await asyncio.sleep(0.1)

async def main():
  await read_source_file()

if __name__ == '__main__':
  asyncio.run(main())
python python-3.x python-asyncio python-aiofiles
1个回答
0
投票

须知:

  • 至少在我的测试中,如果没有立即找到新行,这一行
    line = await source_file.readline()
    将很乐意返回一个空字符串。
  • 主循环中有两次
    line = await source_file.readline()
    ,第一次调用的结果被抛出

第一个调用将返回一行,第二个调用返回空字符串,因为没有其他内容可读取。因此,您在第一次调用中读取了

"Stop, damnit!"
,然后再次调用
readline
并得到空字符串。

您可以通过修改您的作者来验证这一点:

writeToFile("Stop, damnit!\nStop, damnit!")
(也许是
\r\n
)。这样,您可以一次将两行放入文件中,第二次调用
readline
实际上会读取一些内容,并且停止检查实际上会看到该消息。

© www.soinside.com 2019 - 2024. All rights reserved.