我正在从事具有下一个异步操作的Python项目。我想确保每个级别都有良好的错误处理并返回主循环以重复执行。我想设置为,如果有任何错误,我希望取消 pnding 任务。
import asyncio
async def fetch_data_from_service(service_name):
await asyncio.sleep(1) # Simulating I/O operation
if service_name == "ServiceB":
raise Exception(f"Error fetching data from {service_name}")
return f"Data from {service_name}"
async def process_data(data):
await asyncio.sleep(1) # Simulating data processing
if data == "Data from ServiceC":
raise Exception("Error processing data from ServiceC")
return f"Processed {data}"
async def main_task():
try:
dataA = await fetch_data_from_service("ServiceA")
dataB = await fetch_data_from_service("ServiceB")
dataC = await fetch_data_from_service("ServiceC")
processedA = await process_data(dataA)
processedB = await process_data(dataB)
processedC = await process_data(dataC)
print(processedA, processedB, processedC)
except Exception as e:
print(f"Exception caught in main_task: {e}")
# How to ensure all pending tasks are canceled if an error occurs?
# How to propagate this error back to the main event loop?
# Running the event loop
if __name__ == "__main__":
try:
asyncio.run(main_task())
except Exception as e:
print(f"Exception caught in event loop: {e}")
我想要高效的嵌套处理,每个级别检查,如果发生错误则任务取消,以及错误重定向到主事件循环。
我将每个操作包装在 try-expect 块中以捕获和管理错误。但是,我不确定取消过程。
我希望管理异步任务时不会出现错误,即使发生错误,也会调用所有任务并转移到主事件循环。
请帮我解决这个问题。
我的第一个观察结果是,您没有利用 asyncio 的优势,因为您的代码当前已编写:您正在创建一个协程并立即安排它运行并等待其完成,然后再与下一个协程重复此操作。也就是说,您没有同时运行多个异步任务,并且处理过程中不会出现重叠。我们将通过创建三个同时运行的任务来解决这个问题。由于您希望在一项任务中引发异常以导致其他任务终止,因此我建议使用任务组。请注意,可以通过让
fetch_data_from_service
任务直接调用 process_data
来简化代码,因此该函数已重命名为 fetch_and_process_data_from_service
。如果 process_data
属于 CPU 密集型,请考虑使用 run_in_executor
在单独的进程中执行此代码。
import asyncio
async def fetch_and_process_data_from_service(service_name):
await asyncio.sleep(1) # Simulating I/O operation
if service_name == "ServiceB":
raise Exception(f"Error fetching data from {service_name}")
return await process_data(f"Data from {service_name}")
async def process_data(data):
await asyncio.sleep(1) # Simulating data processing
if data == "Data from ServiceC":
raise Exception("Error processing data from ServiceC")
return f"Processed {data}"
async def main_task():
try:
async with asyncio.TaskGroup() as tg:
tasks = [
tg.create_task(fetch_and_process_data_from_service(service_name))
for service_name in ("ServiceA", "ServiceB", "ServiceC")
]
results = [
task.result() for task in tasks
]
print(results)
except Exception as e:
print(f"Exception caught in main_task: {e}")
raise # propogate the exception back
# Running the event loop
if __name__ == "__main__":
try:
asyncio.run(main_task())
except Exception as e:
print(f"Exception caught in event loop: {e}")
打印:
Exception caught in main_task: unhandled errors in a TaskGroup (1 sub-exception)
Exception caught in event loop: unhandled errors in a TaskGroup (1 sub-exception)
但请注意,我们不知道哪个任务失败以及失败的原因。如果这很重要,我们可以使用 `asyncio.gather' 安排协程作为并发任务运行,但如果其中一个任务引发异常,则有必要显式取消未完成的任务:
import asyncio
async def fetch_and_process_data_from_service(service_name):
await asyncio.sleep(1) # Simulating I/O operation
if service_name == "ServiceB":
raise Exception(f"Error fetching data from {service_name}")
return await process_data(f"Data from {service_name}")
async def process_data(data):
await asyncio.sleep(1) # Simulating data processing
if data == "Data from ServiceC":
raise Exception("Error processing data from ServiceC")
return f"Processed {data}"
async def main_task():
tasks = [
asyncio.create_task(fetch_and_process_data_from_service(service_name))
for service_name in ("ServiceA", "ServiceB", "ServiceC")
]
try:
results = await asyncio.gather(*tasks)
print(results)
except Exception as e:
print(f"Exception caught in main_task: {e}")
for task in tasks:
if not task.done():
task.cancel()
raise # propogate the exception back
# Running the event loop
if __name__ == "__main__":
try:
asyncio.run(main_task())
except Exception as e:
print(f"Exception caught in event loop: {e}")
打印:
Exception caught in main_task: Error fetching data from ServiceB
Exception caught in event loop: Error fetching data from ServiceB