无效拓扑:拓扑没有流线程,也没有全局线程,必须订阅至少一个源主题或全局表

问题描述 投票:0回答:1

启动 Spring Boot 应用程序时遇到错误。

org.springframework.context.ApplicationContextException:无法启动bean“defaultKafkaStreamsBuilder”;嵌套异常是 org.springframework.kafka.KafkaException: 无法启动流: ;嵌套异常为 org.apache.kafka.streams.errors.TopologyException:拓扑无效:拓扑没有流线程,也没有全局线程,必须订阅至少一个源主题或全局表。 在 org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:181) ~[spring-context-5.3.28.jar:5.3.28] 在 org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:54) ~[spring-context-5.3.28.jar:5.3.28] 在 org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:356) ~[spring-context-5.3.28.jar:5.3.28] 在 java.lang.Iterable.forEach(Iterable.java:86) ~[?:2.9 (09-29-2022)] 在 org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:155) ~[spring-context-5.3.28.jar:5.3.28] 在 org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:123) ~[spring-context-5.3.28.jar:5.3.28] 在 org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:937) ~[spring-context-5.3.28.jar:5.3.28] 在 org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:586) ~[spring-context-5.3.28.jar:5.3.28] 在 org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:147) ~[spring-boot-2.7.13.jar:2.7.13] 在 org.springframework.boot.SpringApplication.refresh(SpringApplication.java:731) ~[spring-boot-2.7.13.jar:2.7.13] 在 org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:408) ~[spring-boot-2.7.13.jar:2.7.13] 在 org.springframework.boot.SpringApplication.run(SpringApplication.java:307) ~[spring-boot-2.7.13.jar:2.7.13] 在 org.springframework.boot.SpringApplication.run(SpringApplication.java:1303) ~[spring-boot-2.7.13.jar:2.7.13] 在 org.springframework.boot.SpringApplication.run(SpringApplication.java:1292) ~[spring-boot-2.7.13.jar:2.7.13] 在 com.ibm.wce.scbn.api2api.api2apijobs.Api2apiJobsApplication.main(Api2apiJobsApplication.java:28) ~[classes/:?] 在 sun.reflect.NativeMethodAccessorImpl.invoke0(本机方法) ~[?:1.8.0] 在 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:90) ~[?:1.8.0] 在 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:55) ~[?:1.8.0] 在 java.lang.reflect.Method.invoke(Method.java:508) ~[?:1.8.0] 在 org.springframework.boot.devtools.restart.RestartLauncher.run(RestartLauncher.java:50) ~[spring-boot-devtools-2.7.13.jar:2.7.13] 导致:org.springframework.kafka.KafkaException:无法启动流:;嵌套异常为 org.apache.kafka.streams.errors.TopologyException:拓扑无效:拓扑没有流线程,也没有全局线程,必须订阅至少一个源主题或全局表。 在 org.springframework.kafka.config.StreamsBuilderFactoryBean.start(StreamsBuilderFactoryBean.java:371) ~[spring-kafka-2.9.1.jar:2.9.1] 在 org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:178) ~[spring-context-5.3.28.jar:5.3.28] ... 19 更多 引起原因:org.apache.kafka.streams.errors.TopologyException:拓扑无效:拓扑没有流线程,也没有全局线程,必须订阅至少一个源主题或全局表。 在 org.apache.kafka.streams.processor.internals.TopologyMetadata.getNumStreamThreads(TopologyMetadata.java:215) ~[kafka-streams-3.1.2.jar:?] 在 org.apache.kafka.streams.KafkaStreams.(KafkaStreams.java:916) ~[kafka-streams-3.1.2.jar:?] 在 org.apache.kafka.streams.KafkaStreams.(KafkaStreams.java:845) ~[kafka-streams-3.1.2.jar:?] 在 org.apache.kafka.streams.KafkaStreams.(KafkaStreams.java:751) ~[kafka-streams-3.1.2.jar:?] 在 org.springframework.kafka.config.StreamsBuilderFactoryBean.start(StreamsBuilderFactoryBean.java:349) ~[spring-kafka-2.9.1.jar:2.9.1] 在 org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:178) ~[spring-context-5.3.28.jar:5.3.28] ... 19 更多

代码:

import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.annotation.EnableKafkaStreams;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;

@Configuration
@EnableKafka
@EnableKafkaStreams
public class KafkaStreamsConfiguration {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${spring.kafka.application.id}")
    private String applicationId;


    @Bean
    public KafkaStreams kafkaStreams(KafkaStreamsConfiguration streamsConfig) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);

        StreamsBuilder builder = new StreamsBuilder();
        KStream < String, String > kStream = builder.stream("retry-events", Consumed.with(Serdes.String(), Serdes.String()));

        // Define a custom ObjectMapper for JSON parsing
        ObjectMapper objectMapper = new ObjectMapper();

        // Use mapValues to parse JSON and extract the timestamp
        kStream = kStream.mapValues(value - > {
            try {
                JsonNode jsonNode = objectMapper.readTree(value);
                long timestamp = jsonNode.get("timestamp").asLong();
                long tenMinutesAgo = System.currentTimeMillis() / 60000; // Current time in minutes
                if (timestamp <= tenMinutesAgo) {
                    return value; // Keep messages that match the criteria
                } else {
                    return null; // Filter out messages that don't match the criteria
                }
            } catch (Exception e) {
                e.printStackTrace();
                return null;
            }
        }).filter((key, value) - > value != null); // Remove messages that failed parsing

        kStream.to("output-topic", Produced.with(Serdes.String(), Serdes.String()));

        KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), props);
        kafkaStreams.start();
        return kafkaStreams;
    }

}
spring-boot spring-kafka apache-kafka-streams
1个回答
0
投票

当您使用 Spring Kafka Streams 时,您不需要定义和创建 KafkaStreams bean。这将由 Spring 为您完成,但是您确实需要为其提供某种拓扑来运行,正如您所注意到的。

最简单的配置方法是拥有一个 KStream 类型的 bean(无论您在此处选择什么键、值),然后使用 StreamBuilder 注入它。

这看起来像这样:

@Bean public KStream<String, String> topologyBuilder(StreamsBuilder streamsBuilder, YourTopologyClass topo) { return topo.processStream(streamsBuilder); }
你的 YourTopologyClass 看起来像这样:

class YourTopologyClass { public KStream<String, String> processStream(StreamsBuilder streamsBuilder) { // Your topology construction goes here } }
    
© www.soinside.com 2019 - 2024. All rights reserved.