我正在尝试创建一个自定义加入消费者来加入多个事件。
我创建了一个拓扑,它有四个子拓扑(subtopology-0、subtopology-1、subtopology-2、subtopology-3),但顺序与 topology.describe() 描述的顺序不同。
我在三个子拓扑(subtopology-0、subtopology-1、subtopology-2)中创建了一个状态存储,并尝试使用 .connectProcessorAndStateStores("PROCESS2", “COUNTS”)根据 kafka 开发人员指南https://kafka.apache.org/0110/documentation/streams/developer-guide
这是我如何创建处理器并将其附加到拓扑的代码片段。
class StreamCustomizer implements KafkaStreamsInfrastructureCustomizer {
public someMethod(StreamBuilder builder) {
Topology topology = builder.build();
topology.addProcessor("Processor1", new Processor() {...}, "state-store-1).addStateStore(store1,..);
topology.addProcessor("Processor2", new Processor() {...}, "state-store-1)
.addStateStore(store1,..);
topology.addProcessor("Processor3", new Processor() {...}, "state-store-1)
addStateStore(store1,..);
topology.addProcessor("Processor4", new Processor4() {...}, "Processor1", Processor2", "Processor3")
connectProcessorAndStateStores("Prcoessor4", "state-store-1", "state-store-2", "state-store-3");
}
}
这就是如上所述为所有子拓扑定义处理器的方式
new Processor {
private ProcessorContext;
private KeyValueStore<K, V> store;
init(ProcessorContext) {
this.context = context;
store = context.getStore("store-name");
}
}
这是处理器 4 写入的热点,所有状态存储都在 init 方法中从上下文存储中检索。
new Processor4() {
private KeyValueStore<K, V> store1;
private KeyValueStore<K, V> store2;
private KeyValueStore<K, V> store3;
}
我观察到一个奇怪的行为,使用上面的代码,store1、store2 和 store3 都被重新初始化,并且没有保留存储在各自子拓扑(1,2,3)中的任何密钥。 但是,相同的代码可以工作,即,当在类级别声明状态存储时,所有状态存储都保留存储在各自子拓扑中的键值。
class StreamCustomizer implements KafkaStreamsInfrastructureCustomizer {
private KeyValueStore <K, V> store1;
private KeyValueStore <K, V> store2;
private KeyValueStore <K, V> store3;
}
然后在处理器实现中,只需在 init 方法中初始化状态存储即可。
new Processor {
private ProcessorContext;
init(ProcessorContext) {
this.context = context;
store1 = context.getStore("store-name-1");
}
}
有人可以帮忙查找原因吗,或者这个拓扑是否有问题?另外,我读过状态存储可以在同一子拓扑中共享。
只要看到4个子拓扑,状态存储还没有共享,即没有正确连接。