我在发布商方面遇到了一个奇怪的问题,无法解决。让我简要介绍一下我正在做的事情。
我有一个用 go 编写的服务,为了执行一些操作,该服务正在创建一些 K8 作业,它将启动一个 python pod 到进程并发布回结果。
我记录从
future.result()
库中的 publish()
函数返回的 pubsub_v1
。我可以看到这样的日志。为了简单起见,我配置的东西只向我们发送 6 个命令。
Published msg: 4
Published msg: 7
Published msg: 10
Published msg: 13
Published msg: 16
Published msg: 19
如上所示,已发布命令的每个 msgId 有 3 个差异。
对于 go 服务,我将日志提供给我们 msgId 和发布时间,并且我看到了此日志。
Consumed msg_id: 4, publish_time: 2023-08-10 13:29:02.365 +0000 UTC
Consumed msg_id: 5, publish_time: 2023-08-10 13:29:02.638 +0000 UTC
Consumed msg_id: 7, publish_time: 2023-08-10 13:29:03.167 +0000 UTC
Consumed msg_id: 8, publish_time: 2023-08-10 13:29:03.312 +0000 UTC
Consumed msg_id: 10, publish_time: 2023-08-10 13:29:03.946 +0000 UTC
Consumed msg_id: 11, publish_time: 2023-08-10 13:29:04.107 +0000 UTC
Consumed msg_id: 13, publish_time: 2023-08-10 13:29:04.674 +0000 UTC
Consumed msg_id: 14, publish_time: 2023-08-10 13:29:04.827 +0000 UTC
Consumed msg_id: 16, publish_time: 2023-08-10 13:29:05.408 +0000 UTC
Consumed msg_id: 17, publish_time: 2023-08-10 13:29:05.61 +0000 UTC
Consumed msg_id: 19, publish_time: 2023-08-10 13:29:06.145 +0000 UTC
Consumed msg_id: 20, publish_time: 2023-08-10 13:29:06.294 +0000 UTC
从这里可以看出,尽管我们似乎发布了 6 个命令,但我们收到的命令是双倍的,并且与 Python 端不匹配的 msgId 已损坏数据。 (缺少原型字段以及我们在发布者库中创建的一些额外属性)。匹配的包含了我完全需要的数据。
请注意,这是我在 Kubernetes 中的开发环境。所有 pod 都在同一名称空间中运行,并使用 pub-sub 模拟器而不是真实的模拟器(如果这有什么区别的话)
Python 代码基本上向某些第三方发送 SOAP 请求以获取结果并发布命令。分页详细信息是通过环境变量从 go 端传递的。
for page in self._config.pages:
try:
data = self._get_results(page)
if not data:
continue
self._publish(data, page, success=True)
except Exception as e:
self._publish(success=False)
因此,我重写了我们库的发布方法来使用一些重试属性,根据我的研究,我知道这种情况可能会发生,因为 pubSub 库可以重试,并且可能存在重试和初始重试可以同时成功的情况,并且这可能会导致仅发送重试消息的 msg_id。
我会将发布部分放在其中,并删除不必要的业务逻辑以使事情变得简单。
这是发布者客户端的初始化方式
self.publisher: Client = pubsub_v1.PublisherClient(
publisher_options=pubsub_v1.types.PublisherOptions(enable_message_ordering=True),
batch_settings=pubsub_v1.types.BatchSettings(
max_messages=100,
max_bytes=1000000,
max_latency=0.01,
)
)
我们使用默认的重试选项,我将其添加到我的重写函数中。
some logic
...
# tried editing this object, removing predicates, increasing initial value etc.
retry_options = retry.Retry(
initial=100, # default 0.1
maximum=60.0,
multiplier=1.45,
predicate=retry.if_exception_type(
core_exceptions.Aborted,
core_exceptions.Cancelled,
core_exceptions.DeadlineExceeded,
core_exceptions.InternalServerError,
core_exceptions.ResourceExhausted,
core_exceptions.ServiceUnavailable,
core_exceptions.Unknown,
),
deadline=600.0)
publish_future = self.publisher.publish(topic_path, data, retry=retry_options, **attributes)
if callback is not None:
publish_future.add_done_callback(callback)
return publish_future
else:
result = publish_future.result()
self._logger.log_info(f"Published msg: {result}")
代码片段非常简单,让事情变得简单。我可以向您保证,不会在应用程序级别重试任何发布逻辑。
我想指出的是,Python 代码可以自行运行,不会出现任何故障,我编写了一个虚拟订阅者,并通过 docker-compose 使用相同的 Pubsub 模拟器在本地运行内容。如果需要 5 条消息,则收到 5 条消息,当它在 K8 中运行时会发生这种情况
我使用的是python版本3.11.1
google相关库的版本。
grpcio 1.51.1
grpcio-health-checking 1.51.1
grpcio-reflection 1.51.1
grpcio-status 1.51.1
grpcio-tools 1.51.1
google-cloud-pubsub 2.16.0
google-api-core 2.11.1
google-auth 2.22.0
grpc-google-iam-v1 0.12.6
我尝试过的
100m
增加到 200m
(我认为这个问题与此无关,所以我没有将其增加太多来看看会发生什么。我可以尝试一下) 这是我查过的一些资源。
这里指出
One of the most typical reasons would be a DEADLINE_EXCEEDED error, which occurs when the client does not receive a response quickly enough from the server. This can result in duplicates as both the initial request and the retried request could ultimately succeed and you would only get the message ID back from the second request
这似乎与我所看到的问题类似。
有关解决问题的 pub-sub 文档 https://cloud.google.com/pubsub/docs/troubleshooting#publish-deadline-exceeded
我想知道是否有人有任何想法并为我指出正确的方向,也许我该如何解决这个问题。我还可以尝试什么来调试这个问题?
更新
我刚刚尝试使用空数据,并收到 6 条 go 消息。这意味着数据大小是导致发布延迟的问题。我怎样才能减轻这种情况?进一步增加初始超时?增加CPU?
这与其说是一种解决方案,不如说是一种解决方法。我一直遇到类似的问题,在一些测试中我一直使用模拟器在两个服务之间传递单个消息。
我相信模拟器在推送订阅中发送消息后没有收到确认 - 当发送单个消息时。这很难验证,因为模拟器中的日志仅在消息发布到主题时打印。
原因是订阅上的
ack_deadline_seconds
设置为1秒后。即使请求成功,来自模拟器的消息也会立即重试。
解决方法是在每次测试开始时创建订阅并在每次测试结束时删除它。另外,将
ack_deadline_seconds
设置为任意高(默认的 应该 就足够了),这样就不会发生重试。我相信这是一个可以接受的解决方案,因为模拟器仅用于测试目的。
参见下面的示例
pytest.fixture
:
@pytest.fixture(autouse=True)
def subscription():
"""
Recreates a push subscription for every test
due to an emulator bug that retries when sending a single message
"""
subscriber = SubscriberClient()
topic = "dummy-topic"
project = os.environ["GCP_PROJECT_ID"]
topic_name = subscriber.topic_path(project, topic)
url = "http://my-service:8080"
subscription_name = subscriber.subscription_path(project, "thiswillbedeleted")
push_config = types.PushConfig(
push_endpoint=url,
# NOTE: this next line creates a subscription with "noWrapper": { "writeMetadata": true }
# but the emulator does not support this feature yet.
# no_wrapper=types.PushConfig.NoWrapper(write_metadata=True)
)
with subscriber:
subscription = subscriber.create_subscription(
{
"name": subscription_name,
"topic": topic_name,
"push_config": push_config,
# no acknowledgement is received by the emulator - this value has
# been set arbitralily high so that requests aren't retried
# 10 seconds is the default
"ack_deadline_seconds": 10,
}
)
yield subscription
subscriber.delete_subscription(subscription=subscription_name)