通过信号量停止和恢复进程

问题描述 投票:0回答:1

我希望实现一个利用异步事件的 API 来停止/恢复异步任务。该API将支持以下方法:

  • /导入(开始)
  • /停止
  • /简历

主要问题发生在调用方法 /stop 时:

File "/src/srv.py", line 131, in __stop
    loop.run_until_complete(self.__proc.stop())
RuntimeError: This event loop is already running
/lib/python3.10/site-packages/uvicorn/protocols/http/h11_impl.py:-1: RuntimeWarning: coroutine 'Application.stop' was never awaited
RuntimeWarning: Enable tracemalloc to get the object allocation traceback

我尝试在 run_until_complete 中使用await,但有输出:

start
# here i call /stop
end
start_stop
end_stop

我期待这个输出

start
# here i call /stop
start_stop
# here i call /resume
end

我使用了以下代码:

srv.py

import asyncio
from fastapi import FastAPI, APIRouter
from app import Application

class Service:
    def __init__(self):
        self.__app = FastAPI(title='api')
        self.__router = APIRouter()
        self.__router.add_api_route('/import', self.__import, methods=['POST'])
        self.__router.add_api_route('/stop', self.__stop, methods=['POST', 'GET'])
        self.__router.add_api_route('/resume', self.__resume, methods=['POST', 'GET'])
        self.__app.include_router(self.__router)
        self.__proc = Application()

    async def __import(self):
        loop = asyncio.get_running_loop()
        loop.create_task(self.__proc.import_data())

    async def __stop(self) -> None:
        loop = asyncio.get_running_loop()
        loop.run_until_complete(self.__proc.stop())

    async def __resume(self) -> None:
        self.__proc.resume()

    def run(self) -> None:
        run(app=self.__app, host='0.0.0.0', port=8000)


if __name__ == '__main__':
    Service().run()

应用程序.py

import asyncio


class Application:
    def __init__(self):
        self.__state = True
        self.__semaphore = asyncio.Semaphore(1)

    async def import_data(self):
        while True:
            async with self.__semaphore:
                print('start')
                await asyncio.sleep(5)
                print('end')

    def resume(self):
        self.__state = True

    async def stop(self):
        self.__state = False
        while not self.__state:
            async with self.__semaphore:
                print('start_stop')
                await asyncio.sleep(5)
                print('end_stop')
python python-asyncio fastapi semaphore
1个回答
0
投票

发生错误是因为事件循环已经运行时正在调用

run_until_complete
。 Uvicorn 已经为您管理事件循环。 您可以在 stop 函数中使用 wait 来暂停任务,而不是尝试手动处理循环:

srv.py

async def __stop(self) -> None:
    await self.__proc.stop()

async def __import(self):
    asyncio.create_task(self.__proc.import_data())

应用程序.py

async def import_data(self):
    while self._running:
        if not self._paused:
            print('start')
            await asyncio.sleep(5)
            print('end')
        else:
            # https://superfastpython.com/thread-busy-waiting-in-python/Sleep 
            # briefly while paused to avoid busy waiting
            await asyncio.sleep(1) 

def resume(self):
    self._paused = False

async def stop(self):
    self._paused = True

这可以避免阻塞事件循环并使 API 能够暂停和恢复。

© www.soinside.com 2019 - 2024. All rights reserved.