KafkaStreams 一开始不读取来自 kafka 的消息

问题描述 投票:0回答:1
@Bean
    public KafkaStreams kafkaStreams(KafkaStreamsConfiguration streamsConfig) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);

        StreamsBuilder builder = new StreamsBuilder();
        
        KStream<String, String> kStream = builder.stream("input-topic", Consumed.with( Serdes.String(), Serdes.String() ));
        ObjectMapper objectMapper = new ObjectMapper();
        
        // Use mapValues to parse JSON and extract the timestamp
        kStream = kStream.mapValues(value -> {
            try {
                JsonNode jsonNode = objectMapper.readTree(value);
                long timestamp = jsonNode.get("timestamp").asLong();
                long tenMinutesAgo = System.currentTimeMillis() / 60000; // Current time in minutes
                if (timestamp <= tenMinutesAgo) {
                    log.info("paased timestamp: " + timestamp);
                    log.info("original tenMinutesAgo: " + tenMinutesAgo);
                    return value; // Keep messages that match the criteria
                } else {
                    return null; // Filter out messages that don't match the criteria
                }
            } catch (Exception e) {
                e.printStackTrace();
                return null;
            }
        }).filter((key, value) -> value != null); // Remove messages that failed parsing

        kStream.to("output-topic", Produced.with(Serdes.String(), Serdes.String()));

        KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), props);
        kafkaStreams.start();
        return kafkaStreams;
    }

在输入主题中我通过了

{"timestamp": "28285312","id":"time01"}
{"timestamp": "28285420","id":"time02"}
{"timestamp": "28285412","id":"time05"}
{"timestamp": "28285412","id":"time03"}
{"timestamp": "28285412","id":"time04"}

在output-topic中,它检查最新消息时间戳与当前时间戳,如果小于或等于当前时间戳,则转到output-topic。因此,这里第三个时间戳大于当前时间戳,因此它不会移动到输出主题,但一旦它达到与第三个时间戳相同的时间戳,kafkastream 就不会选择并放入输出主题。

java apache-kafka apache-kafka-streams
1个回答
0
投票

在 Kafka Streams 中,必须添加一个静态

consumerPrefix()
函数来修改消费者客户端属性。

否则,您必须传递

consumer.auto.offset.reset
作为属性键

最新问题
© www.soinside.com 2019 - 2025. All rights reserved.