@Bean
public KafkaStreams kafkaStreams(KafkaStreamsConfiguration streamsConfig) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> kStream = builder.stream("input-topic", Consumed.with( Serdes.String(), Serdes.String() ));
ObjectMapper objectMapper = new ObjectMapper();
// Use mapValues to parse JSON and extract the timestamp
kStream = kStream.mapValues(value -> {
try {
JsonNode jsonNode = objectMapper.readTree(value);
long timestamp = jsonNode.get("timestamp").asLong();
long tenMinutesAgo = System.currentTimeMillis() / 60000; // Current time in minutes
if (timestamp <= tenMinutesAgo) {
log.info("paased timestamp: " + timestamp);
log.info("original tenMinutesAgo: " + tenMinutesAgo);
return value; // Keep messages that match the criteria
} else {
return null; // Filter out messages that don't match the criteria
}
} catch (Exception e) {
e.printStackTrace();
return null;
}
}).filter((key, value) -> value != null); // Remove messages that failed parsing
kStream.to("output-topic", Produced.with(Serdes.String(), Serdes.String()));
KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), props);
kafkaStreams.start();
return kafkaStreams;
}
在输入主题中我通过了
{"timestamp": "28285312","id":"time01"}
{"timestamp": "28285420","id":"time02"}
{"timestamp": "28285412","id":"time05"}
{"timestamp": "28285412","id":"time03"}
{"timestamp": "28285412","id":"time04"}
在output-topic中,它检查最新消息时间戳与当前时间戳,如果小于或等于当前时间戳,则转到output-topic。因此,这里第三个时间戳大于当前时间戳,因此它不会移动到输出主题,但一旦它达到与第三个时间戳相同的时间戳,kafkastream 就不会选择并放入输出主题。
在 Kafka Streams 中,必须添加一个静态
consumerPrefix()
函数来修改消费者客户端属性。
否则,您必须传递
consumer.auto.offset.reset
作为属性键