Kafka Streams 和 CompletableFuture(或异步 java api)

问题描述 投票:0回答:1

我正在研究 Apache Kafka Stream SPI。我想知道是否有一种方法可以在

mapValues
方法内部执行异步代码。例如从外部存储检索数据。有没有一种方法可以以事件循环响应式方式与 Kaska Streams 进行交互?

    StreamsBuilder streamsBuilder = new StreamsBuilder();
    streamsBuilder
            .stream("SOURCE_TOPIC", Consumed.with(Serdes.String(), Serdes.String()))
            .mapValues((readOnlyKey, value) -> value.toUpperCase()) 
            .to("DESTINATION_TOPIC", Produced.with(Serdes.String(), Serdes.String()));
    var topology = streamsBuilder.build()

    var kafkaStreams = new KafkaStreams(topology, properties);

如何替换此

mapValues
代码

value.toUpperCase()

与:

CompletableFuture.completedFuture(value.toUpperCase())
java apache-kafka reactive-programming apache-kafka-streams completable-future
1个回答
0
投票

Kafka Streams 无意调用外部存储。 StoreBuilder 可能是可插拔的,但要执行您所要求的操作,您应该首先将远程存储(Kafka Connect、Debezium 等)中的流传输到一个主题中,构建一个 KTable,将流与该表连接起来,如下所示事件。这可能需要使用处理器 API,而不是 DSL(或您所说的 SPI)。

© www.soinside.com 2019 - 2024. All rights reserved.