在订阅上应用过滤器时,Python Pubsub 订阅者客户端不会拉取消息

问题描述 投票:0回答:1

我在我的 Python Web 应用程序中使用 Pubsub 流式拉取订阅。当我没有应用任何订阅过滤器时,订阅者客户端能够成功从订阅中提取消息。 但是,如果应用了订阅过滤器,订阅者将停止拉取消息。如果我手动转到特定订阅并单击“拉取”,我可以看到订阅中有消息(显然与过滤条件匹配,因此是存在于订阅中)。但客户端无法拉取任何这些消息。 我需要为客户端做任何额外的配置吗?我的订阅者客户端的代码如下:-

import os

from google.cloud import pubsub_v1
from app.services.subscription_service import save_bill_events
from app.utils.constants import BILL_SUBSCRIPTION_GCP_PROJECT_ID, BILL_EVENT_SUBSCRIPTION_ID
from app.utils.logging_tracing_manager import get_logger



logger = get_logger(__file__)


def callback(message: pubsub_v1.subscriber.message.Message) -> None:
    save_bill_events(message.data)
    message.ack()


subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(os.environ.get(BILL_SUBSCRIPTION_GCP_PROJECT_ID),
                                                 BILL_EVENT_SUBSCRIPTION_ID)

# Limit the subscriber to only have fixed number of  outstanding messages at a time.
flow_control = pubsub_v1.types.FlowControl(max_messages=50)
streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback, flow_control=flow_control)



async def poll_bill_subscription():
    
    with subscriber:
        try:
            # When `timeout` is not set, result() will block indefinitely,
            # unless an exception is encountered first.
           
            streaming_pull_future.result()
        except Exception as e:
            # Even in case of an exception, subscriber should keep listening
            logger.error(
                f"An error occurred while pulling message from subscription {BILL_EVENT_SUBSCRIPTION_ID}",
                exc_info=True)
            
            pass
python google-cloud-pubsub publish-subscribe
1个回答
0
投票

我想通了。我的应用程序在创建订阅后立即创建订阅者客户端。当使用过滤器创建订阅时,可能需要一些时间,在此期间它不会注册订阅者客户端,也不会抛出任何错误。 重新部署后就可以正常工作了。我添加了一个小睡眠,现在效果很好

最新问题
© www.soinside.com 2019 - 2025. All rights reserved.