我有一个应用程序,它使用 KStream 从 Kafka 读取数据,根据标头过滤数据,然后写入 KTable。
public Topology buildTopology() {
KStream<String,String> inputStream = builder.stream("topicname");
KStream<String,String> filteredStream = inputStream.transformValues(KSExtension::new)
.filter((key,value) -> value!=null);
kTable = filteredStream.groupByKey()
.reduce(((value1, value2) -> value2));
KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration);
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
return builder.build();
}
我正在尝试使用 TopologyTestDriver 为此创建一个单元测试
private TopologyTestDriver td;
private TestInputTopic<String, String> inputTopic;
private TestOutputTopic<String, String> outputTopic;
private Topology topology;
private Properties streamConfig;
@BeforeEach
void setUp() {
streamConfig = new Properties();
streamConfig.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "AppId");
streamConfig.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "foo:1234");
streamConfig.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamConfig.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
topology = new Topology();
td = new TopologyTestDriver(topology, streamConfig);
inputTopic = td.createInputTopic("input-topic", Serdes.String().serializer(), Serdes.String().serializer());
outputTopic = td.createOutputTopic("output-topic", Serdes.String().deserializer(), Serdes.String().deserializer());
}
@Test
void buildTopology(){
inputTopic.pipeInput("key1", "value1");
topology = app.buildTopology();
}
当我运行测试时,我收到异常“java.lang.IllegalArgumentException:未知主题:输入主题”
DEBUG org.apache.kafka.streams.processor.internals.InternalTopologyBuilder - No source topics using pattern subscription found, initializing consumer's subscription collection.
java.lang.IllegalArgumentException: Unknown topic: input-topic
at org.apache.kafka.streams.TopologyTestDriver.pipeRecord(TopologyTestDriver.java:582)
at org.apache.kafka.streams.TopologyTestDriver.pipeRecord(TopologyTestDriver.java:945)
at org.apache.kafka.streams.TestInputTopic.pipeInput(TestInputTopic.java:115)
at org.apache.kafka.streams.TestInputTopic.pipeInput(TestInputTopic.java:137)
at testclassname.buildTopology()
有人可以帮助我理解我在这里缺少什么吗?
Topology
用于初始化 TopologyTestDriver
:
topology = new Topology();
td = new TopologyTestDriver(topology, streamConfig);
当这个空拓扑用于实例化
TopologyTestDriver
和 td = new TopologyTestDriver(topology, streamConfig);
时,测试驱动程序不知道任何主题,因为没有有效构建拓扑。
我想这就是为什么,当您尝试使用
"input-topic"
将输入输入到 inputTopic.pipeInput("key1", "value1");
时,测试驱动程序会抛出一个 IllegalArgumentException
抱怨“Unknown topic: input-topic
”。
您应该调用
buildTopology()
方法来生成您正在测试的实际拓扑,并在创建 TopologyTestDriver
时使用它。
确保测试中的主题名称 (
input-topic
、output-topic
) 与实际应用程序中的主题名称 ("topicname"
) 相匹配。
@BeforeEach
void setUp() {
streamConfig = new Properties();
streamConfig.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "AppId");
streamConfig.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "foo:1234");
streamConfig.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamConfig.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
// Create the topology using your actual code
topology = app.buildTopology();
// Now create a TopologyTestDriver using the real topology
td = new TopologyTestDriver(topology, streamConfig);
// The topic name here should match the actual topic you use in the real topology
inputTopic = td.createInputTopic("topicname", Serdes.String().serializer(), Serdes.String().serializer());
// Create output topic if you need it
// outputTopic = td.createOutputTopic("output-topic", Serdes.String().deserializer(), Serdes.String().deserializer());
}
@Test
void buildTopology(){
inputTopic.pipeInput("key1", "value1");
// Your assertions here
}
注意:我从设置中删除了输出主题,因为在您的代码片段中,您没有指定写入
KTable
的输出主题。如果您的实际应用程序写入输出主题,您可以将其添加回来。