Kafka Streams Binder:访问全局状态存储

问题描述 投票:0回答:1

此问题相关: 我正在使用

spring-boot-starter-parent
版本
3.1.2
spring-cloud-stream-binder-kafka-streams
版本
4.0.3

我使用以下代码创建了一个 GlobalKTable:

@Bean
public StreamsBuilderFactoryBeanCustomizer customizer() {
    return fb -> {
        try {
            final StreamsBuilder streamsBuilder = fb.getObject();
            streamsBuilder.addGlobalStore(...);
        }
        catch (Exception e) {

        }
    };
}

但是,我需要从计划任务(在我的活页夹流功能之外)访问此 GKT。尝试使用 InteractiveQueryService 访问商店会抛出

UnknownStateStoreException

我发现执行此操作的唯一方法是手动将 KafkaStream 对象注册为 Bean :

@Bean
public StreamsBuilderFactoryBeanCustomizer customizer(
    GenericApplicationContext applicationContext) {

    return fb -> {
        try {
            final StreamsBuilder streamsBuilder = fb.getObject();
            streamsBuilder.addGlobalStore(...);
            applicationContext.registerBean(
                MY_BEAN_NAME, KafkaStreams.class, fb::getKafkaStreams);
        }
        catch (Exception e) {

        }
    };
}

然后在我的计划任务中从 ApplicationContext 检索 KafkaStreams Bean

@Scheduled
public void myTask() {
  var ks = ((KafkaStreams) applicationContext.getBean(MY_BEAN_NAME));
    var myGKT =
        ks.store(
            StoreQueryParameters.fromNameAndType(
                MY_STORE_NAME, QueryableStoreTypes.keyValueStore()));

这看起来很迂回。您打算如何在最新版本的 Kafka Streams Binder 中访问 GKT?

java spring apache-kafka apache-kafka-streams spring-cloud-stream-binder-kafka
1个回答
0
投票

从外部访问 GlobalKTable 或在 Kafka Streams 拓扑中创建的任何状态存储(例如在计划任务中)通常涉及 InteractiveQueryService。该服务提供了一种直接查询状态存储的方法,无需通过 Kafka。

您遇到的 UnknownStateStoreException 通常会发生,因为状态存储尚未准备好,或者 Kafka Streams 还不知道。这可能是由于以下几个原因造成的:

  1. 名称不匹配:确保您在 InteractiveQueryServiceKafkaStreams.store(...) 中使用的名称是您商店的确切名称。
  2. 您的应用程序尚未运行:确保您的流应用程序处于RUNNING状态。在达到此状态之前,商店可能无法查询。
  3. 并非所有实例都有全局存储GlobalKTable(或任何全局存储),顾名思义,是全局的。这意味着您的服务的所有实例都应该有它的完整副本。确保所有实例都已加载并处理存储。

我知道您正在寻找一种更简化的方法。为了获得更好的体验,请按照以下步骤操作:

  1. 状态存储已就绪:您可以监听 StreamsBuilderFactoryBean 以确保状态存储已就绪。
@Autowired
private InteractiveQueryService interactiveQueryService;

@Scheduled
public void myTask() {
    try {
        ReadOnlyKeyValueStore<String, MyValue> store = 
            interactiveQueryService.getQueryableStore(MY_STORE_NAME, QueryableStoreTypes.keyValueStore());

        // ... Use the store to fetch values.
    } catch (UnknownStateStoreException e) {
        // Handle or log the exception. You can retry after a delay.
    }
}
  1. 添加 StreamListener 生命周期挂钩:您可以使用 @StreamListener 生命周期挂钩来了解流何时准备就绪。
@StreamListener(target = Processor.INPUT, condition = "headers['type']=='...'")
public void input(...) {
    // Your logic ...
}

@StreamListener("process-0")  // This assumes your input channel name is process-0
public void onEvent(...) {
    // Once this is called, it means your Kafka Streams instance is running and the state stores should be available.
}
  1. 配置:确保已定义spring.cloud.stream.kafka.streams.binder.configuration.application.server。这是 InteractiveQueryService 所需要的。

通过这些步骤,您应该能够使用 InteractiveQueryService 访问您的 GlobalKTable,而无需直接处理 KafkaStreams 对象或 ApplicationContext。请记住始终处理 UnknownStateStoreException 异常,因为存储可能并不总是立即可用。

© www.soinside.com 2019 - 2024. All rights reserved.