我通过在 GCP 的 Cloud Run 上运行的 FastAPI 应用程序接收来自 Twilio 的 Webhook。该应用程序接收消息并将其发布到 PubSub 主题。该主题有基于电话号码的消息排序。
在某些时候,应用程序会停止响应 Webhooks 并开始返回 504 错误。我无法确定问题出在哪里。我怀疑当我实例化 PubSub 客户端时可能存在一些并发问题。 “恢复”服务的唯一方法是重新启动 Cloud Run 容器。 错误日志中没有显示任何内容。我正在考虑为 PubSub 客户端启用日志记录,但我也不知道该怎么做。
从日志中可以看到,最后一条消息是“Instantiating PubSub client...”,然后 POD 卡住了。
从 google.cloud 导入 pubsub
def get_pub_sub():
return PubSub("primary-care-378721")
class PubSub:
def __init__(self, project_name, enable_message_ordering=True) -> None:
print("Instantiating PubSub client...")
publisher_options = pubsub.types.PublisherOptions(
enable_message_ordering=enable_message_ordering
)
self.project_name = project_name
self.publisher = pubsub.PublisherClient(publisher_options=publisher_options)
print("Done")
def publish_message(
self, topic_name: str, message: dict, ordering_key: str
) -> None:
topic_path = self.publisher.topic_path(self.project_name, topic_name)
data = json.dumps(message).encode("utf-8")
future = self.publisher.publish(
topic=topic_path, data=data, ordering_key=ordering_key
)
return future.result()
这是快速 API 路由器代码:
@router.post("/incoming-message", status_code=status.HTTP_204_NO_CONTENT)
async def receive_incoming_message(
request: Request, pub_sub_client=Depends(get_pub_sub)
):
print("Processing request...")
form_data = await request.form()
form_dict = form_data._dict
ordering_key = form_dict.get("From")
topic_name = "twilio_incoming_message-de6f415"
rs = pub_sub_client.publish_message(
topic_name=topic_name, message=form_dict, ordering_key=ordering_key
)
print(
f"Message {rs} published successfully to topic {topic_name} with ordering key {ordering_key}"
)
我已经利用FastAPI的生命周期解决了这个问题。 (https://fastapi.tiangolo.com/advanced/events/)
我刚刚在 LifeSpan 中初始化了 Pubsub 客户端,并在路由中使用它。
@contextlib.asynccontextmanager
async def lifespan(app: FastAPI):
db_manager.init()
app.state.pub_sub_client_ordered = get_pub_sub(enable_message_ordering=True)
app.state.pub_sub_client = get_pub_sub(enable_message_ordering=False)
yield
await db_manager.close()