Google pubsub SubscriberClient AttributeError:“生成器”对象没有属性“add_done_callback”

问题描述 投票:0回答:1

我正在开发一个项目,必须使用我的 GCP Pubsub 订阅中的一些消息。在本地我可以正常消费消息。

但是当我发布对云中容器的更改(例如 staging env)时,我收到此错误:

2024-05-14 15:50:15,379  ERROR [google.api_core.bidi:_thread_main:678] [pid=1] [tname=Thread-ConsumeBidirectionalStream] [cluster=mt2] [1.65.1] Thread-ConsumeBidirectionalStream caught unexpected exception 'generator' object has no attribute 'add_done_callback' and will exit.
Traceback (most recent call last):
  File "/opt/project/lib/python3.9/site-packages/google/api_core/bidi.py", line 644, in _thread_main
    self._bidi_rpc.open()
  File "/opt/project/lib/python3.9/site-packages/google/api_core/bidi.py", line 294, in open
    call._wrapped.add_done_callback(self._on_call_done)
AttributeError: 'generator' object has no attribute 'add_done_callback'

我的代码很简单:一个带有 bash 入口点的 docker 容器,启动一个初始化消费者的 python 文件。

初始化.py

def main():
    """
    Entrypoint for all consumers
    """

    args = get_args()
    config_settings = get_config_section(args.config_filename)
    consumer = BaseGooglePubSubConsumer(config_settings)
    consumer.run()


if __name__ == "__main__":
    main()

消费者.py

from functools import cached_property

from google.cloud.pubsub_v1 import SubscriberClient

class BaseGooglePubSubConsumer(BaseConsumer):

    @cached_property
    def consumer(self) -> SubscriberClient:
        json_account_info = self._get_service_account_info()
        credentials = service_account.Credentials.from_service_account_info(json_account_info)
        return SubscriberClient(credentials=credentials)

    def run(self):
        """
        Consumes messages from a Pub/Sub Subscriber.
        """
        streaming_pull_future = self.consumer.subscribe(self._get_subscription_path(), callback=self.process_message)
        with self.consumer:
            try:
                streaming_pull_future.result()
            except Exception as exc:
                le.errors = "Failed to process message"
                logger.error(le, exc_info=exc)
                streaming_pull_future.cancel()
                streaming_pull_future.result()
            finally:
                le.end()
    def process_message(self, message: Message) -> None:
        # message.ack()
        logger.info(message)

我已阅读有关该错误的信息,并且发现了类似的问题但没有答案:https://github.com/open-telemetry/opentelemetry-python-contrib/issues/1180

python google-api google-cloud-pubsub google-api-python-client grpc-python
1个回答
0
投票

我也有同样的问题。它可以在 Macbook M2 和 Windows WSL 中本地运行。但是一旦它被容器化到 docker 中,它就会抛出完全相同的错误。

© www.soinside.com 2019 - 2024. All rights reserved.