我有一个可以在本地完美运行的代码:
... main.py
from file import my_test
... test.py
async test():
"""some async function"""
await asyncio.sleep(1)
return "test"
import asyncio
my_test = asyncio.run(test())
请注意,测试是在模块级别调用的,但是在azure函数应用程序中,这个简单的代码不起作用。
raise RuntimeError('此事件循环已经在运行')
据我所知,这意味着函数应用程序已经在运行自己的事件循环,所以你不能运行它两次。
所以我尝试开发一个自定义运行函数来捕获事件循环并等待该值(如果存在),否则使用普通的 asyncio.run。
但不知道如何正确制作它,因为它需要在本地和天蓝色中同时工作,并且不能同时异步和非异步。
def my_run(coro: Coroutine[Any, Any, T], *args, **kwargs) -> T:
""" Azure uses already asyncio.run and it cannot be nested, so if run in azure we take their event loop"""
try:
loop = asyncio.get_running_loop()
except RuntimeError:
return asyncio.run(coro, *args, **kwargs)
else:
return loop.run_until_complete(coro(*args, **kwargs))
my_test = my_run(test()) # far from working
有什么想法吗?
提前致谢
您提供的代码无法在 Azure 函数应用程序中运行,因为 Azure 函数主机已在运行其自己的事件循环,并且您无法在 Azure 函数内运行另一个事件循环。
要解决此问题,您可以使用您定义的
my_run()
函数。此函数检查异步事件循环是否已在运行。如果是,则该函数使用现有的事件循环来运行 test()
函数。否则,该函数将启动一个新的事件循环并运行 test()
函数。
我已经尝试过了,下面是httptrigger函数代码
at module level defined async def test() outside of function
我在本地和azure函数应用程序中都成功运行了
import azure.functions as func
import logging
import asyncio
from typing import Any, Coroutine
app = func.FunctionApp(http_auth_level=func.AuthLevel.ANONYMOUS)
async def test():
"""some async function"""
await asyncio.sleep(1)
return "Application Run Successfully"
@app.route(route="http_trigger")
def http_trigger(req: func.HttpRequest) -> func.HttpResponse:
logging.info('Python HTTP trigger function processed a request')
def my_run(coro: Coroutine[Any, Any, Any], *args, **kwargs) -> Any:
"""Azure uses already asyncio.run and it cannot be nested, so if run in Azure, we take their event loop"""
try:
loop = asyncio.get_running_loop()
except RuntimeError:
return asyncio.run(coro, *args, **kwargs)
else:
return loop.run_until_complete(coro(*args, **kwargs))
result = my_run(test()) # Call your async function using my_run
print(result)
return func.HttpResponse(result)
本地运行
部署在Azure Function应用程序中