从 FastAPI 应用程序向 GCP PubSub 发布消息

问题描述 投票:0回答:1

我通过在 GCP 的 Cloud Run 上运行的 FastAPI 应用程序接收来自 Twilio 的 Webhook。该应用程序接收消息并将其发布到 PubSub 主题。该主题有基于电话号码的消息排序。

在某些时候,应用程序会停止响应 Webhooks 并开始返回 504 错误。我无法确定问题出在哪里。我怀疑当我实例化 PubSub 客户端时可能存在一些并发问题。 “恢复”服务的唯一方法是重新启动 Cloud Run 容器。 错误日志中没有显示任何内容。我正在考虑为 PubSub 客户端启用日志记录,但我也不知道该怎么做。

从日志中可以看到,最后一条消息是“Instantiating PubSub client...”,然后 POD 卡住了。

enter image description here

从 google.cloud 导入 pubsub

def get_pub_sub():
    return PubSub("primary-care-378721")


class PubSub:

    def __init__(self, project_name, enable_message_ordering=True) -> None:
        print("Instantiating PubSub client...")
        publisher_options = pubsub.types.PublisherOptions(
            enable_message_ordering=enable_message_ordering
        )
        self.project_name = project_name
        self.publisher = pubsub.PublisherClient(publisher_options=publisher_options)
        print("Done")

    def publish_message(
        self, topic_name: str, message: dict, ordering_key: str
    ) -> None:
        topic_path = self.publisher.topic_path(self.project_name, topic_name)
        data = json.dumps(message).encode("utf-8")
        future = self.publisher.publish(
            topic=topic_path, data=data, ordering_key=ordering_key
        )
        return future.result()

这是快速 API 路由器代码:

@router.post("/incoming-message", status_code=status.HTTP_204_NO_CONTENT)
async def receive_incoming_message(
    request: Request, pub_sub_client=Depends(get_pub_sub)
):
    print("Processing request...")
    form_data = await request.form()
    form_dict = form_data._dict
    ordering_key = form_dict.get("From")

    topic_name = "twilio_incoming_message-de6f415"

 
    rs = pub_sub_client.publish_message(
        topic_name=topic_name, message=form_dict, ordering_key=ordering_key
    )
    print(
        f"Message {rs} published successfully to topic {topic_name} with ordering key {ordering_key}"
    )
python-3.x fastapi google-cloud-pubsub
1个回答
0
投票

我已经利用FastAPI的生命周期解决了这个问题。 (https://fastapi.tiangolo.com/advanced/events/)

我刚刚在 LifeSpan 中初始化了 Pubsub 客户端,并在路由中使用它。

@contextlib.asynccontextmanager
async def lifespan(app: FastAPI):
    db_manager.init()
    app.state.pub_sub_client_ordered = get_pub_sub(enable_message_ordering=True)
    app.state.pub_sub_client = get_pub_sub(enable_message_ordering=False)

    yield
    await db_manager.close()
© www.soinside.com 2019 - 2024. All rights reserved.