我正在开发一个项目,必须使用我的 GCP Pubsub 订阅中的一些消息。在本地我可以正常消费消息。
但是当我发布对云中容器的更改(例如 staging env)时,我收到此错误:
2024-05-14 15:50:15,379 ERROR [google.api_core.bidi:_thread_main:678] [pid=1] [tname=Thread-ConsumeBidirectionalStream] [cluster=mt2] [1.65.1] Thread-ConsumeBidirectionalStream caught unexpected exception 'generator' object has no attribute 'add_done_callback' and will exit.
Traceback (most recent call last):
File "/opt/project/lib/python3.9/site-packages/google/api_core/bidi.py", line 644, in _thread_main
self._bidi_rpc.open()
File "/opt/project/lib/python3.9/site-packages/google/api_core/bidi.py", line 294, in open
call._wrapped.add_done_callback(self._on_call_done)
AttributeError: 'generator' object has no attribute 'add_done_callback'
我的代码很简单:一个带有 bash 入口点的 docker 容器,启动一个初始化消费者的 python 文件。
def main():
"""
Entrypoint for all consumers
"""
args = get_args()
config_settings = get_config_section(args.config_filename)
consumer = BaseGooglePubSubConsumer(config_settings)
consumer.run()
if __name__ == "__main__":
main()
from functools import cached_property
from google.cloud.pubsub_v1 import SubscriberClient
class BaseGooglePubSubConsumer(BaseConsumer):
@cached_property
def consumer(self) -> SubscriberClient:
json_account_info = self._get_service_account_info()
credentials = service_account.Credentials.from_service_account_info(json_account_info)
return SubscriberClient(credentials=credentials)
def run(self):
"""
Consumes messages from a Pub/Sub Subscriber.
"""
streaming_pull_future = self.consumer.subscribe(self._get_subscription_path(), callback=self.process_message)
with self.consumer:
try:
streaming_pull_future.result()
except Exception as exc:
le.errors = "Failed to process message"
logger.error(le, exc_info=exc)
streaming_pull_future.cancel()
streaming_pull_future.result()
finally:
le.end()
def process_message(self, message: Message) -> None:
# message.ack()
logger.info(message)
我已阅读有关该错误的信息,并且发现了类似的问题但没有答案:https://github.com/open-telemetry/opentelemetry-python-contrib/issues/1180
我也有同样的问题。它可以在 Macbook M2 和 Windows WSL 中本地运行。但是一旦它被容器化到 docker 中,它就会抛出完全相同的错误。