我正在使用
spring-boot-starter-parent
版本 3.1.2
和 spring-cloud-stream-binder-kafka-streams
版本 4.0.3
绝大多数在线示例都显示使用
@Input
注释创建 GlobalKTable,该注释已被弃用。我还没有找到如何使用最新版本的 Kafka Streams Binder 创建 GKT 作为 KeyValueStore 的工作示例。
我发现的最接近的是绕过 Binder 并使用 GKT 手动创建 KafkaStreams bean:
@Bean
public KafkaStreams kafkaStreams(
@Value("${spring.cloud.stream.kafka.streams.binder.brokers}") String brokers) {
StreamsBuilder builder = new StreamsBuilder();
// Create a GlobalKTable
builder.globalTable(
Topics.GKT_TOPIC,
Materialized.<Key, Value, KeyValueStore<Bytes, byte[]>>as(
Stores.MY_GKT_STORE_NAME)
.withKeySerde(KeySerde)
.withValueSerde(ValueSerde));
// Setup stream processing logic here...
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-app-id");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
KafkaStreams streams = new KafkaStreams(builder.build(), config);
// Start the streams
streams.start();
return streams;
}
任何人都可以向我指出如何使用最新版本的 Kafka Streams Binder 执行此操作的文档或示例吗?
我终于在文档中找到了如何做到这一点
如上所述,绑定器不提供将全局状态存储注册为功能的一流方法。为此,您需要使用定制器。以下是如何做到这一点。
@Bean
public StreamsBuilderFactoryBeanCustomizer customizer() {
return fb -> {
try {
final StreamsBuilder streamsBuilder = fb.getObject();
streamsBuilder.addGlobalStore(...);
}
catch (Exception e) {
}
};
}