我正在研究 Apache Kafka Stream SPI。我想知道是否有一种方法可以在
mapValues
方法内部执行异步代码。例如从外部存储检索数据。有没有一种方法可以以事件循环响应式方式与 Kaska Streams 进行交互?
StreamsBuilder streamsBuilder = new StreamsBuilder();
streamsBuilder
.stream("SOURCE_TOPIC", Consumed.with(Serdes.String(), Serdes.String()))
.mapValues((readOnlyKey, value) -> value.toUpperCase())
.to("DESTINATION_TOPIC", Produced.with(Serdes.String(), Serdes.String()));
var topology = streamsBuilder.build()
var kafkaStreams = new KafkaStreams(topology, properties);
如何替换此
mapValues
代码
value.toUpperCase()
与:
CompletableFuture.completedFuture(value.toUpperCase())
Kafka Streams 无意调用外部存储。 StoreBuilder 可能是可插拔的,但要执行您所要求的操作,您应该首先将远程存储(Kafka Connect、Debezium 等)中的流传输到一个主题中,构建一个 KTable,将流与该表连接起来,如下所示事件。这可能需要使用处理器 API,而不是 DSL(或您所说的 SPI)。