Java 客户端不会从 GCP Pub/Sub 主题检索未确认的消息

问题描述 投票:0回答:1

我有一个 Spring Boot 客户端配置

spring:
  cloud:
    gcp:
      pubsub:
        publisher:
          enable-message-ordering: true
        subscriber:
          flow-control:
            max-outstanding-element-count: 10
(among other spring configs)

使用此 pub/sub 配置来设置

MANUAL
AckMode:

@Configuration
public class PubSubConfig {
    ...

    @Bean
    public PubSubInboundChannelAdapter inboundChannelAdapter(
            MessageChannel inputChannel,
            AppConfig appConfig,
            PubSubTemplate pubSubTemplate) {
        PubSubInboundChannelAdapter adapter =
                new PubSubInboundChannelAdapter(pubSubTemplate, "(subscription id)");
        adapter.setOutputChannel(inputChannel);
        adapter.setAckMode(AckMode.MANUAL);
        adapter.setPayloadType(SomeEvent.class);
        return adapter;
    }
}

我有一个从这个基于 pull 的主题中提取的事件处理程序

@Component
@Slf4j
@RequiredArgsConstructor
public class EventHandler {

    @ServiceActivator(inputChannel = "inputChannel")
    public void process(SomeEvent someEvent,
                        @Header(GcpPubSubHeaders.ORIGINAL_MESSAGE) BasicAcknowledgeablePubsubMessage message) {
        logger.info("Received event {}", someEvent);
        long now = System.currentTimeMillis() / 1000L;
        if (someEvent.getEarliestRetryTime() > now) {
            logger.warn(
                    "Event not ready to run yet {}",
                    new Date(someEvent.getEarliestRetryTime() * 1000)
            );
            // No-ack the message
            return;
        }

        // Do stuff
        bool success = doStuff();
        if (success) {
            message.ack();
        }
        // Not successful - no ack and retry
    }
}

总体思路是消息等待10分钟后再处理。不确认消息应导致在 600 秒内再次重试消息,如此处配置。

enter image description here

但是,此消息 1) 收到一次,记录

Event not ready to run yet [timestamp in the past]
,并保留在队列中。

enter image description here

目标是尽可能长时间地重试消息,每 10 分钟重试一次,直到

doStuff
成功。我可能会错过什么?

google-cloud-platform gcloud google-cloud-pubsub
1个回答
0
投票

根据您的设置,您似乎需要将消息重新传递给您的客户端,以重新检查它们是否 >= 10m 旧才能处理它们。

要实现这一点,您需要在 Pub/Sub 尝试重新传送消息之前使现有消息传送过期。


您的订阅者是否设置了

spring.cloud.gcp.pubsub.subscriber.max-ack-extension-period
,如果是的话,设置为什么值?

默认情况下,spring 库不会尝试通过修改消息来防止消息过期 (doc)

由于您希望消息在太年轻的情况下过期,以便稍后可以重新发送以重试它们,因此您应该保留

spring.cloud.gcp.pubsub.subscriber.max-ack-extension-period
未设置。


您的出版商是否使用多个订购键进行发布?

如果发布到您主题的所有消息都具有相同的排序键,则该主题的启用排序的订阅者将不会再收到任何消息,直到它确认已传递的消息为止。

我建议查看消息排序文档,看看使用消息排序是否适合您的用例。

© www.soinside.com 2019 - 2024. All rights reserved.