与此问题相关: 我正在使用
spring-boot-starter-parent
版本 3.1.2
和 spring-cloud-stream-binder-kafka-streams
版本 4.0.3
我使用以下代码创建了一个 GlobalKTable:
@Bean
public StreamsBuilderFactoryBeanCustomizer customizer() {
return fb -> {
try {
final StreamsBuilder streamsBuilder = fb.getObject();
streamsBuilder.addGlobalStore(...);
}
catch (Exception e) {
}
};
}
但是,我需要从计划任务(在我的活页夹流功能之外)访问此 GKT。尝试使用 InteractiveQueryService 访问商店会抛出
UnknownStateStoreException
。
我发现执行此操作的唯一方法是手动将 KafkaStream 对象注册为 Bean :
@Bean
public StreamsBuilderFactoryBeanCustomizer customizer(
GenericApplicationContext applicationContext) {
return fb -> {
try {
final StreamsBuilder streamsBuilder = fb.getObject();
streamsBuilder.addGlobalStore(...);
applicationContext.registerBean(
MY_BEAN_NAME, KafkaStreams.class, fb::getKafkaStreams);
}
catch (Exception e) {
}
};
}
然后在我的计划任务中从 ApplicationContext 检索 KafkaStreams Bean
@Scheduled
public void myTask() {
var ks = ((KafkaStreams) applicationContext.getBean(MY_BEAN_NAME));
var myGKT =
ks.store(
StoreQueryParameters.fromNameAndType(
MY_STORE_NAME, QueryableStoreTypes.keyValueStore()));
这看起来很迂回。您打算如何在最新版本的 Kafka Streams Binder 中访问 GKT?
从外部访问 GlobalKTable 或在 Kafka Streams 拓扑中创建的任何状态存储(例如在计划任务中)通常涉及 InteractiveQueryService。该服务提供了一种直接查询状态存储的方法,无需通过 Kafka。
您遇到的 UnknownStateStoreException 通常会发生,因为状态存储尚未准备好,或者 Kafka Streams 还不知道。这可能是由于以下几个原因造成的:
我知道您正在寻找一种更简化的方法。为了获得更好的体验,请按照以下步骤操作:
@Autowired
private InteractiveQueryService interactiveQueryService;
@Scheduled
public void myTask() {
try {
ReadOnlyKeyValueStore<String, MyValue> store =
interactiveQueryService.getQueryableStore(MY_STORE_NAME, QueryableStoreTypes.keyValueStore());
// ... Use the store to fetch values.
} catch (UnknownStateStoreException e) {
// Handle or log the exception. You can retry after a delay.
}
}
@StreamListener(target = Processor.INPUT, condition = "headers['type']=='...'")
public void input(...) {
// Your logic ...
}
@StreamListener("process-0") // This assumes your input channel name is process-0
public void onEvent(...) {
// Once this is called, it means your Kafka Streams instance is running and the state stores should be available.
}
通过这些步骤,您应该能够使用 InteractiveQueryService 访问您的 GlobalKTable,而无需直接处理 KafkaStreams 对象或 ApplicationContext。请记住始终处理 UnknownStateStoreException 异常,因为存储可能并不总是立即可用。