我有一个使用 Spring Cloud Stream Binder 的 GCP pub sub 实现。
我想仅当 ackMode 设置为 MANUAL 时才对消息进行手动确认。 目前我正在使用
@Bean
public Consumer<Message<String>> pubsub1() {
return message -> {
log.info("New message received from Pub/Sub: {}", message);
// For manual acknowledgment
BasicAcknowledgeablePubsubMessage acknowledgeablePubsubMessage = message.getHeaders()
.get(GcpPubSubHeaders.ORIGINAL_MESSAGE, BasicAcknowledgeablePubsubMessage.class);
try {
// Process the message payload
messageProcessor.processMessage("GCP-PubSub", message.getPayload());
// Acknowledge the message
acknowledgeablePubsubMessage.ack();
log.info("Message acknowledged successfully.");
} catch (Exception e) {
log.error("Error processing message: {}", e.getMessage(), e);
// NACK the message
acknowledgeablePubsubMessage.nack();
log.info("Message negatively acknowledged.");
}
};
}
有没有更好的方法可以在没有 GCP 特定术语的情况下做到这一点,例如
message.getHeaders()
.get(GcpPubSubHeaders.ORIGINAL_MESSAGE, BasicAcknowledgeablePubsubMessage.class);
我还在 yml 中的消费者绑定中设置了 ack-mode。 有没有办法在执行此操作之前检查消费者是否设置为自动或手动确认?
不,此类功能没有通用 API。 嗯,Spring Integration 中有 - 请参阅
org.springframework.integration.acks
。但这并不是作为特定于协议的驱动程序的包装来实现的。例如,即使对于 Apache Kafka 绑定器,我们仍然依赖 org.springframework.kafka.support.Acknowledgment
标头:https://docs.spring.io/spring-cloud-stream/reference/kafka/kafka-binder/manual-ack.html。
该标头的存在应该足以确保我们处于
AUTO
或 MANUAL
模式。但不确定 Pub/Sub 绑定器是如何实现的。