我有一个在 uvicorn 服务器内运行的 python 应用程序。我已经创建了一个 Pub/sub 订阅者并尝试从我的 main.py 中启用它。我正在使用流式拉取订阅。现在,我的要求是,一旦创建了订阅者,控制权应该返回到 main.py,而不是在订阅者监听事件时被阻止
我的订阅者的代码如下-
from google.cloud import pubsub_v1
from app.services.subscription_service import save_bill_events
from app.utils import constants
from app.utils.logging_tracing_manager import get_logger
import traceback
print("Entered in bill_subscriber----------------------")
logger = get_logger(__file__)
def callback(message: pubsub_v1.subscriber.message.Message) -> None:
save_bill_events(message.data)
message.ack()
async def create_bill_subscriber():
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path("{projectId}",
constants.BILL_EVENT_SUBSCRIPTION_ID)
# Limit the subscriber to only have fixed number of outstanding messages at a time.
flow_control = pubsub_v1.types.FlowControl(max_messages=50)
streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback, flow_control=flow_control)
with subscriber:
try:
# When `timeout` is not set, result() will block indefinitely,
# unless an exception is encountered first.
streaming_pull_future.result()
except Exception as e:
# Even in case of an exception, subscriber should keep listening
logger.error(
f"An error occurred while pulling message from subscription {constants.BILL_EVENT_SUBSCRIPTION_ID}",
exc_info=True)
traceback.print_exc()
pass
从我的 main.py 中,我尝试使用 asyncio 调用上述方法
asyncio.run(main=bill_subscriber.create_bill_subscriber())
但我看到一个错误
RuntimeError: asyncio.run() cannot be called from a running event loop
。我是否没有正确使用asyncio.run()
?
uvicorn 是否有可能在事件循环内运行应用程序,因此我们无法启动另一个事件循环?如果是这样的话,还有其他方法可以在后台启动订阅者吗?
uvicorn 是否有机会在事件循环中运行应用程序 因此我们无法启动另一个事件循环?如果是这样的话,就是 还有其他方法可以在后台启动订阅者吗?
是的,
uvicorn
的整个想法是它运行一个异步循环,并且可以选择将它们的视图编写为异步协同例程 - 然后它确实为每个请求使用一个异步任务,而不是更昂贵的,系统资源术语,线程。
可以简单地将新任务添加到 uvicorn 使用的正在运行的 asyncio 循环中,并且只需小心处理在“作为请求视图的任务”默认值之外创建的任务的踪迹。 Python 的 asyncio 模型实际上就是为此而设计的。
您只需更改对函数的调用,这样 它从内部工作并且已经在运行循环
import asyncio
...
my_tasks = set()
def someview(...):
...
# instead of:
# asyncio.run(main=bill_subscriber.create_bill_subscriber())
task = asyncio.create_task(bill_subscriber.create_bill_subscriber())
my_tasks.add(task)
task.add_done_callback(my_tasks.discard)
...
使用一个集合来保存在视图返回后应继续运行的任务的想法是,asyncio 需要您保留对此类任务的引用。上面的回调在完成时删除对其的引用应该足以避免任何资源泄漏。
此外,根据您在任务中执行的操作,您可能希望在自定义上下文中运行 - 检查 https://docs.python.org/3/library/contextvars.html 以了解该部分,如果你觉得需要它。