我有一个 Spring Boot 客户端配置
spring:
cloud:
gcp:
pubsub:
publisher:
enable-message-ordering: true
subscriber:
flow-control:
max-outstanding-element-count: 10
(among other spring configs)
使用此 pub/sub 配置来设置
MANUAL
AckMode:
@Configuration
public class PubSubConfig {
...
@Bean
public PubSubInboundChannelAdapter inboundChannelAdapter(
MessageChannel inputChannel,
AppConfig appConfig,
PubSubTemplate pubSubTemplate) {
PubSubInboundChannelAdapter adapter =
new PubSubInboundChannelAdapter(pubSubTemplate, "(subscription id)");
adapter.setOutputChannel(inputChannel);
adapter.setAckMode(AckMode.MANUAL);
adapter.setPayloadType(SomeEvent.class);
return adapter;
}
}
我有一个从这个基于 pull 的主题中提取的事件处理程序
@Component
@Slf4j
@RequiredArgsConstructor
public class EventHandler {
@ServiceActivator(inputChannel = "inputChannel")
public void process(SomeEvent someEvent,
@Header(GcpPubSubHeaders.ORIGINAL_MESSAGE) BasicAcknowledgeablePubsubMessage message) {
logger.info("Received event {}", someEvent);
long now = System.currentTimeMillis() / 1000L;
if (someEvent.getEarliestRetryTime() > now) {
logger.warn(
"Event not ready to run yet {}",
new Date(someEvent.getEarliestRetryTime() * 1000)
);
// No-ack the message
return;
}
// Do stuff
bool success = doStuff();
if (success) {
message.ack();
}
// Not successful - no ack and retry
}
}
总体思路是消息等待10分钟后再处理。不确认消息应导致在 600 秒内再次重试消息,如此处配置。
但是,此消息 1) 收到一次,记录
Event not ready to run yet [timestamp in the past]
,并保留在队列中。
目标是尽可能长时间地重试消息,每 10 分钟重试一次,直到
doStuff
成功。我可能会错过什么?
根据您的设置,您似乎需要将消息重新传递给您的客户端,以重新检查它们是否 >= 10m 旧才能处理它们。
要实现这一点,您需要在 Pub/Sub 尝试重新传送消息之前使现有消息传送过期。
您的订阅者是否设置了
spring.cloud.gcp.pubsub.subscriber.max-ack-extension-period
,如果是的话,设置为什么值?
默认情况下,spring 库不会尝试通过修改消息来防止消息过期 (doc)
由于您希望消息在太年轻的情况下过期,以便稍后可以重新发送以重试它们,因此您应该保留
spring.cloud.gcp.pubsub.subscriber.max-ack-extension-period
未设置。
您的出版商是否使用多个订购键进行发布?
如果发布到您主题的所有消息都具有相同的排序键,则该主题的启用排序的订阅者将不会再收到任何消息,直到它确认已传递的消息为止。
我建议查看消息排序文档,看看使用消息排序是否适合您的用例。