设置如下:
我有一个主题设置来接收消息,然后我有一个使用Pub/Sub to GCS Text Template设置的Dataflow管道,它将消息转储到GCS Bucket中的窗口文本文件中。这工作正常 - 我最终在GCS中获取包含我通过控制台发送的测试消息的文件,使用我主题上的“发布消息”按钮(旁注我以为它会保存消息的“数据”部分,但它看起来像它只保存邮件正文。但我可以解决这个问题)。
问题:
我打算使用python客户端(最终来自App Engine)向主题发送消息...但是当我从本地计算机发送消息时,我无法使用它。我正在使用https://cloud.google.com/pubsub/docs/publisher模块跟踪pubsub_v1
中非常简单的示例。即使批量设置设置为1kb / 1s,我也无法成功发布任何内容。我在未来5分钟后没有回复时继续收到此错误:
Traceback (most recent call last):
File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py", line 810, in __bootstrap_inner
self.run()
File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py", line 763, in run
self.__target(*self.__args, **self.__kwargs)
File "/path/to/environment/lib/python2.7/site-packages/google/cloud/pubsub_v1/publisher/batch/thread.py", line 239, in monitor
return self._commit()
File "/path/to/environment/lib/python2.7/site-packages/google/cloud/pubsub_v1/publisher/batch/thread.py", line 204, in _commit
self._messages,
File "/path/to/environment/lib/python2.7/site-packages/google/cloud/pubsub_v1/gapic/publisher_client.py", line 325, in publish
return self._publish(request, retry=retry, timeout=timeout)
File "/path/to/environment/lib/python2.7/site-packages/google/api_core/gapic_v1/method.py", line 139, in __call__
return wrapped_func(*args, **kwargs)
File "/path/to/environment/lib/python2.7/site-packages/google/api_core/retry.py", line 260, in retry_wrapped_func
on_error=on_error,
File "/path/to/environment/lib/python2.7/site-packages/google/api_core/retry.py", line 195, in retry_target
last_exc)
File "/path/to/environment/lib/python2.7/site-packages/six.py", line 737, in raise_from
raise value
RetryError: Deadline of 600.0s exceeded while calling <functools.partial object at 0x10662de68>, last exception: 503 Getting metadata from plugin failed with error: ('invalid_grant: Bad Request', u'{\n "error" : "invalid_grant",\n "error_description" : "Bad Request"\n}')
代码示例:
>>> from google.cloud import pubsub_v1
>>> BATCH_SETTINGS = pubsub_v1.types.BatchSettings(max_bytes=1024,max_latency=1)
>>> publisher = pubsub_v1.PublisherClient(BATCH_SETTINGS)
>>> topic_path = publisher.topic_path("my-project", "topic-name")
>>> publisher.publish(topic_path, b'first message from python 3:38:59')
更新:部署我的应用程序后,一切正常,在生产中。这对我的需求很好,但是知道为什么它在我的计算机上以调试模式在本地运行时不起作用仍然会很好。
这是一个已知问题,如果您运行python的环境的系统时钟不同步,您将无法进行身份验证。很难知道如何在不知道运行什么类型的系统的情况下同步时钟,但是您可以查看here以获取更多详细信息。