Kafka 状态存储跨子拓扑共享

问题描述 投票:0回答:1

我正在尝试创建一个自定义加入消费者来加入多个事件。

我创建了一个拓扑,它有四个子拓扑(subtopology-0、subtopology-1、subtopology-2、subtopology-3),但顺序与 topology.describe() 描述的顺序不同。

我在三个子拓扑(subtopology-0、subtopology-1、subtopology-2)中创建了一个状态存储,并尝试使用 .connectProcessorAndStateStores("PROCESS2", “COUNTS”)根据 kafka 开发人员指南https://kafka.apache.org/0110/documentation/streams/developer-guide

这是我如何创建处理器并将其附加到拓扑的代码片段。

class StreamCustomizer implements  KafkaStreamsInfrastructureCustomizer {
   public someMethod(StreamBuilder builder) {
    Topology topology = builder.build();

    topology.addProcessor("Processor1", new Processor() {...}, "state-store-1).addStateStore(store1,..);
    topology.addProcessor("Processor2", new Processor() {...}, "state-store-1)
.addStateStore(store1,..);
    topology.addProcessor("Processor3", new Processor() {...}, "state-store-1)
addStateStore(store1,..);
topology.addProcessor("Processor4", new Processor4() {...}, "Processor1", Processor2", "Processor3")
connectProcessorAndStateStores("Prcoessor4", "state-store-1", "state-store-2", "state-store-3");
  

}
}

这就是如上所述为所有子拓扑定义处理器的方式

new Processor {
 private ProcessorContext;
private KeyValueStore<K, V> store;
  init(ProcessorContext) {
   this.context = context;
   store = context.getStore("store-name");
}
}

这是处理器 4 写入的热点,所有状态存储都在 init 方法中从上下文存储中检索。

new Processor4() {
private KeyValueStore<K, V> store1;
private KeyValueStore<K, V> store2;
private KeyValueStore<K, V> store3;
}

我观察到一个奇怪的行为,使用上面的代码,store1、store2 和 store3 都被重新初始化,并且没有保留存储在各自子拓扑(1,2,3)中的任何密钥。 但是,相同的代码可以工作,即,当在类级别声明状态存储时,所有状态存储都保留存储在各自子拓扑中的键值。

class StreamCustomizer implements KafkaStreamsInfrastructureCustomizer { private KeyValueStore <K, V> store1; private KeyValueStore <K, V> store2; private KeyValueStore <K, V> store3; }
然后在处理器实现中,只需在 init 方法中初始化状态存储即可。

new Processor { private ProcessorContext; init(ProcessorContext) { this.context = context; store1 = context.getStore("store-name-1"); } }
有人可以帮忙查找原因吗,或者这个拓扑是否有问题?另外,我读过状态存储可以在同一子拓扑中共享。

apache-kafka apache-kafka-streams
1个回答
0
投票
很难说(代码片段不太清楚),但是,通过共享状态,您可以有效地合并子拓扑。因此,如果你做得正确,你最终会得到一个包含所有处理器的子拓扑。

只要看到4个子拓扑,状态存储还没有共享,即没有正确连接。

最新问题
© www.soinside.com 2019 - 2025. All rights reserved.