我希望实现一个利用异步事件的 API 来停止/恢复异步任务。该API将支持以下方法:
主要问题发生在调用方法 /stop 时:
File "/src/srv.py", line 131, in __stop
loop.run_until_complete(self.__proc.stop())
RuntimeError: This event loop is already running
/lib/python3.10/site-packages/uvicorn/protocols/http/h11_impl.py:-1: RuntimeWarning: coroutine 'Application.stop' was never awaited
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
我尝试在 run_until_complete 中使用await,但有输出:
start
# here i call /stop
end
start_stop
end_stop
我期待这个输出
start
# here i call /stop
start_stop
# here i call /resume
end
我使用了以下代码:
srv.py
import asyncio
from fastapi import FastAPI, APIRouter
from app import Application
class Service:
def __init__(self):
self.__app = FastAPI(title='api')
self.__router = APIRouter()
self.__router.add_api_route('/import', self.__import, methods=['POST'])
self.__router.add_api_route('/stop', self.__stop, methods=['POST', 'GET'])
self.__router.add_api_route('/resume', self.__resume, methods=['POST', 'GET'])
self.__app.include_router(self.__router)
self.__proc = Application()
async def __import(self):
loop = asyncio.get_running_loop()
loop.create_task(self.__proc.import_data())
async def __stop(self) -> None:
loop = asyncio.get_running_loop()
loop.run_until_complete(self.__proc.stop())
async def __resume(self) -> None:
self.__proc.resume()
def run(self) -> None:
run(app=self.__app, host='0.0.0.0', port=8000)
if __name__ == '__main__':
Service().run()
应用程序.py
import asyncio
class Application:
def __init__(self):
self.__state = True
self.__semaphore = asyncio.Semaphore(1)
async def import_data(self):
while True:
async with self.__semaphore:
print('start')
await asyncio.sleep(5)
print('end')
def resume(self):
self.__state = True
async def stop(self):
self.__state = False
while not self.__state:
async with self.__semaphore:
print('start_stop')
await asyncio.sleep(5)
print('end_stop')
发生错误是因为事件循环已经运行时正在调用
run_until_complete
。 Uvicorn 已经为您管理事件循环。
您可以在 stop 函数中使用 wait 来暂停任务,而不是尝试手动处理循环:
srv.py
async def __stop(self) -> None:
await self.__proc.stop()
async def __import(self):
asyncio.create_task(self.__proc.import_data())
应用程序.py
async def import_data(self):
while self._running:
if not self._paused:
print('start')
await asyncio.sleep(5)
print('end')
else:
# https://superfastpython.com/thread-busy-waiting-in-python/Sleep
# briefly while paused to avoid busy waiting
await asyncio.sleep(1)
def resume(self):
self._paused = False
async def stop(self):
self._paused = True
这可以避免阻塞事件循环并使 API 能够暂停和恢复。