如何使用Python asyncio lib高效处理嵌套异步操作?

问题描述 投票:0回答:1

我正在从事具有下一个异步操作的Python项目。我想确保每个级别都有良好的错误处理并返回主循环以重复执行。我想设置为,如果有任何错误,我希望取消 pnding 任务。

import asyncio

async def fetch_data_from_service(service_name):
    await asyncio.sleep(1)  # Simulating I/O operation
    if service_name == "ServiceB":
        raise Exception(f"Error fetching data from {service_name}")
    return f"Data from {service_name}"

async def process_data(data):
    await asyncio.sleep(1)  # Simulating data processing
    if data == "Data from ServiceC":
        raise Exception("Error processing data from ServiceC")
    return f"Processed {data}"

async def main_task():
    try:
        dataA = await fetch_data_from_service("ServiceA")
        dataB = await fetch_data_from_service("ServiceB")
        dataC = await fetch_data_from_service("ServiceC")
        
        processedA = await process_data(dataA)
        processedB = await process_data(dataB)
        processedC = await process_data(dataC)

        print(processedA, processedB, processedC)
    except Exception as e:
        print(f"Exception caught in main_task: {e}")
        # How to ensure all pending tasks are canceled if an error occurs?
        # How to propagate this error back to the main event loop?

# Running the event loop
if __name__ == "__main__":
    try:
        asyncio.run(main_task())
    except Exception as e:
        print(f"Exception caught in event loop: {e}")

我想要高效的嵌套处理,每个级别检查,如果发生错误则任务取消,以及错误重定向到主事件循环。

我将每个操作包装在 try-expect 块中以捕获和管理错误。但是,我不确定取消过程。

我希望管理异步任务时不会出现错误,即使发生错误,也会调用所有任务并转移到主事件循环。

请帮我解决这个问题。

python error-handling python-asyncio
1个回答
0
投票

我的第一个观察结果是,您没有利用 asyncio 的优势,因为您的代码当前已编写:您正在创建一个协程并立即安排它运行并等待其完成,然后再与下一个协程重复此操作。也就是说,您没有同时运行多个异步任务,并且处理过程中不会出现重叠。我们将通过创建三个同时运行的任务来解决这个问题。由于您希望在一项任务中引发异常以导致其他任务终止,因此我建议使用任务组。请注意,可以通过让

fetch_data_from_service
任务直接调用
process_data
来简化代码,因此该函数已重命名为
fetch_and_process_data_from_service
。如果
process_data
属于 CPU 密集型,请考虑使用
run_in_executor
在单独的进程中执行此代码。

import asyncio

async def fetch_and_process_data_from_service(service_name):
    await asyncio.sleep(1)  # Simulating I/O operation
    if service_name == "ServiceB":
        raise Exception(f"Error fetching data from {service_name}")
    return await process_data(f"Data from {service_name}")

async def process_data(data):
    await asyncio.sleep(1)  # Simulating data processing
    if data == "Data from ServiceC":
        raise Exception("Error processing data from ServiceC")
    return f"Processed {data}"

async def main_task():
    try:
        async with asyncio.TaskGroup() as tg:
            tasks = [
                tg.create_task(fetch_and_process_data_from_service(service_name))
                for service_name in ("ServiceA", "ServiceB", "ServiceC")
            ]

        results = [
            task.result() for task in tasks
        ]
        print(results)
    except Exception as e:
        print(f"Exception caught in main_task: {e}")
        raise  # propogate the exception back

# Running the event loop
if __name__ == "__main__":
    try:
        asyncio.run(main_task())
    except Exception as e:
        print(f"Exception caught in event loop: {e}")

打印:

Exception caught in main_task: unhandled errors in a TaskGroup (1 sub-exception)
Exception caught in event loop: unhandled errors in a TaskGroup (1 sub-exception)

但请注意,我们不知道哪个任务失败以及失败的原因。如果这很重要,我们可以使用 `asyncio.gather' 安排协程作为并发任务运行,但如果其中一个任务引发异常,则有必要显式取消未完成的任务:

import asyncio

async def fetch_and_process_data_from_service(service_name):
    await asyncio.sleep(1)  # Simulating I/O operation
    if service_name == "ServiceB":
        raise Exception(f"Error fetching data from {service_name}")
    return await process_data(f"Data from {service_name}")

async def process_data(data):
    await asyncio.sleep(1)  # Simulating data processing
    if data == "Data from ServiceC":
        raise Exception("Error processing data from ServiceC")
    return f"Processed {data}"

async def main_task():
    tasks = [
        asyncio.create_task(fetch_and_process_data_from_service(service_name))
        for service_name in ("ServiceA", "ServiceB", "ServiceC")
    ]

    try:
        results = await asyncio.gather(*tasks)
        print(results)
    except Exception as e:
        print(f"Exception caught in main_task: {e}")
        for task in tasks:
            if not task.done():
                task.cancel()
        raise  # propogate the exception back

# Running the event loop
if __name__ == "__main__":
    try:
        asyncio.run(main_task())
    except Exception as e:
        print(f"Exception caught in event loop: {e}")

打印:

Exception caught in main_task: Error fetching data from ServiceB
Exception caught in event loop: Error fetching data from ServiceB
© www.soinside.com 2019 - 2024. All rights reserved.