我在我的 Python Web 应用程序中使用 Pubsub 流式拉取订阅。当我没有应用任何订阅过滤器时,订阅者客户端能够成功从订阅中提取消息。 但是,如果应用了订阅过滤器,订阅者将停止拉取消息。如果我手动转到特定订阅并单击“拉取”,我可以看到订阅中有消息(显然与过滤条件匹配,因此是存在于订阅中)。但客户端无法拉取任何这些消息。 我需要为客户端做任何额外的配置吗?我的订阅者客户端的代码如下:-
import os
from google.cloud import pubsub_v1
from app.services.subscription_service import save_bill_events
from app.utils.constants import BILL_SUBSCRIPTION_GCP_PROJECT_ID, BILL_EVENT_SUBSCRIPTION_ID
from app.utils.logging_tracing_manager import get_logger
logger = get_logger(__file__)
def callback(message: pubsub_v1.subscriber.message.Message) -> None:
save_bill_events(message.data)
message.ack()
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(os.environ.get(BILL_SUBSCRIPTION_GCP_PROJECT_ID),
BILL_EVENT_SUBSCRIPTION_ID)
# Limit the subscriber to only have fixed number of outstanding messages at a time.
flow_control = pubsub_v1.types.FlowControl(max_messages=50)
streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback, flow_control=flow_control)
async def poll_bill_subscription():
with subscriber:
try:
# When `timeout` is not set, result() will block indefinitely,
# unless an exception is encountered first.
streaming_pull_future.result()
except Exception as e:
# Even in case of an exception, subscriber should keep listening
logger.error(
f"An error occurred while pulling message from subscription {BILL_EVENT_SUBSCRIPTION_ID}",
exc_info=True)
pass
我想通了。我的应用程序在创建订阅后立即创建订阅者客户端。当使用过滤器创建订阅时,可能需要一些时间,在此期间它不会注册订阅者客户端,也不会抛出任何错误。 重新部署后就可以正常工作了。我添加了一个小睡眠,现在效果很好