并行运行 Python 3.12 异步任务

问题描述 投票:0回答:1

为了使对

write_to_file(linesBuffer)
函数的每一个调用并行运行而不是顺序运行,下面的 Python 3.12 代码中具体需要进行哪些更改?

换句话说,

  1. 我们希望程序继续执行而不等待
    write_to_file(linesBuffer)
    返回,
  2. 但我们还想确保每次调用
    write_to_file(linesBuffer)
    最终都会返回。

write_to_file(linesBuffer)
的每次调用应在不同的时间开始,并在可能需要不同的持续时间后返回,以便每次调用成功完成其工作。 并且在发起下一次对
write_to_file(linesBuffer)
的调用之前,永远不应等待对
write_to_file(linesBuffer)
的一次调用完成。

当我们从

await
行中删除
write_to_file(linesBuffer)
时,结果是
write_to_file(linesBuffer)
函数内的任何打印命令都不会被执行。 所以我们不能简单地将
await write_to_file(linesBuffer)
更改为
write_to_file(linesBuffer)

代码中的问题是对

await write_to_file(linesBuffer)
函数的多次连续调用导致程序变得非常慢。

这是代码:

import os
import platform
import asyncio

numLines = 10

def get_source_file_path():
    if platform.system() == 'Windows':
        return 'C:\\path\\to\\sourceFile.txt'
    else:
        return '/path/to/sourceFile.txt'

async def write_to_file(linesBuffer):
    print("inside Writing to file...")
    with open('newFile.txt', 'a') as new_destination_file:
        for line in linesBuffer:
            new_destination_file.write(line)
    #get the name of the directory in which newFile.txt is located. Then print the name of the directory.
    directory_name = os.path.dirname(os.path.abspath('newFile.txt'))
    print("directory_name: ", directory_name)
    linesBuffer.clear()
    #print every 1 second for 2 seconds.
    for i in range(2):
        print("HI HO, HI HO.  IT'S OFF TO WORK WE GO...")
        await asyncio.sleep(1)
    print("inside done Writing to file...")

async def read_source_file():
    source_file_path = get_source_file_path()
    linesBuffer = []
    counter = 0
    print("Reading source file...")
    print("source_file_path: ", source_file_path)
    #Detect the size of the file located at source_file_path and store it in the variable file_size.
    file_size = os.path.getsize(source_file_path)
    print("file_size: ", file_size)
    with open(source_file_path, 'r') as source_file:
        source_file.seek(0, os.SEEK_END)
        while True:
            line = source_file.readline()
            new_file_size = os.path.getsize(source_file_path)
            if new_file_size < file_size:
                print("The file has been truncated.")
                source_file.seek(0, os.SEEK_SET)
                file_size = new_file_size
                linesBuffer.clear()
                counter = 0
                print("new_file_size: ", new_file_size)
            if len(line) > 0:
              new_line = str(counter) + " line: " + line
              print(new_line)
              linesBuffer.append(new_line)
              print("len(linesBuffer): ", len(linesBuffer))
              if len(linesBuffer) >= numLines:
                print("Writing to file...")
                await write_to_file(linesBuffer) #When we remove await from this line, the function never runs.    
                print("awaiting Writing to file...")
                linesBuffer.clear()
              counter += 1
              print("counter: ", counter)
            if not line:
                await asyncio.sleep(0.1)
                continue
            #detect whether or not the present line is the last line in the file.  If it is the last line in the file, then write the line to the file.
            if source_file.tell() == file_size:
                print("LAST LINE IN FILE FOUND.  Writing to file...")
                await write_to_file(linesBuffer)
                print("awaiting Writing to file...")
                linesBuffer.clear()
                counter = 0
        
async def main():
    await read_source_file()

if __name__ == '__main__':
    asyncio.run(main())
python python-3.x asynchronous python-asyncio
1个回答
0
投票

几点:

首先,正如前面所评论的,您正在执行的文件 I/O 不是异步的,并且

asyncio
不支持异步文件 I/O。为此,我建议您从 PyPI 存储库安装
aiofiles
模块。

其次,你有以下条件:

await write_to_file(linesBuffer) #When we remove await from this line, the function never runs.

实际上,如果没有

await
,函数
write_to_file
永远不会被调用。表达式
write_to_file(linesBuffer)
只会返回一个协程,如果您想调用它,则必须等待该协程,就像您当前正在做的那样。但这个调用实际上是同步的,因为调用者被挂起,协程被调用,一旦完成并返回一个值(即使它是隐式的
None
,如果没有
return
语句)调用者就会用
 恢复await write_to_file(linesBuffer)
评估该返回值。

但是您希望

write_to_file
与您的
read_source_file
协程异步(并发)运行。为此,您需要创建一个单独的任务。详情请参阅
asyncio.create_task
。请特别注意保存此调用返回的任务实例,以防止任务因垃圾回收而提前终止。

所以基本上你修改后的代码如下(我还没有验证其整体逻辑是否正确):

import os
import platform
import asyncio
import aiofiles

numLines = 10

def get_source_file_path():
    if platform.system() == 'Windows':
        return 'C:\\path\\to\\sourceFile.txt'
    else:
        return '/path/to/sourceFile.txt'

async def write_to_file(linesBuffer):
    print("inside Writing to file...")
    async with aiofiles.open('newFile.txt', 'a') as new_destination_file:
        for line in linesBuffer:
            await new_destination_file.write(line)
    #get the name of the directory in which newFile.txt is located. Then print the name of the directory.
    directory_name = os.path.dirname(os.path.abspath('newFile.txt'))
    print("directory_name: ", directory_name)
    linesBuffer.clear()
    #print every 1 second for 2 seconds.
    for i in range(2):
        print("HI HO, HI HO.  IT'S OFF TO WORK WE GO...")
        await asyncio.sleep(1)
    print("inside done Writing to file...")

async def read_source_file():
    source_file_path = get_source_file_path()
    linesBuffer = []
    counter = 0
    print("Reading source file...")
    print("source_file_path: ", source_file_path)
    #Detect the size of the file located at source_file_path and store it in the variable file_size.
    file_size = os.path.getsize(source_file_path)
    print("file_size: ", file_size)
    
    background_tasks = set()
    
    async with aiofiles.open(source_file_path, 'r') as source_file:
        source_file.seek(0, os.SEEK_END)
        while True:
            line = await source_file.readline()
            new_file_size = os.path.getsize(source_file_path)
            if new_file_size < file_size:
                print("The file has been truncated.")
                await source_file.seek(0, os.SEEK_SET)
                file_size = new_file_size
                linesBuffer.clear()
                counter = 0
                print("new_file_size: ", new_file_size)
            if len(line) > 0:
              new_line = str(counter) + " line: " + line
              print(new_line)
              linesBuffer.append(new_line)
              print("len(linesBuffer): ", len(linesBuffer))
              if len(linesBuffer) >= numLines:
                print("Writing to file...")
                task = asyncio.create_task(write_to_file(linesBuffer))
                background_tasks.append(task)
                task.add_done_callback(background_tasks.discard)
                linesBuffer.clear()
              counter += 1
              print("counter: ", counter)
            if not line:
                await asyncio.sleep(0.1)
                continue
            #detect whether or not the present line is the last line in the file.  If it is the last line in the file, then write the line to the file.
            if await source_file.tell() == file_size:
                print("LAST LINE IN FILE FOUND.  Writing to file...")
                task = asyncio.create_task(write_to_file(linesBuffer))
                background_tasks.append(task)
                task.add_done_callback(background_tasks.discard)
                linesBuffer.clear()
                counter = 0
        
async def main():
    await read_source_file()

if __name__ == '__main__':
    asyncio.run(main())
最新问题
© www.soinside.com 2019 - 2024. All rights reserved.