[我试图绕过Kafka Streams,并提出一些我似乎无法自行解决的基本问题。我了解KTable
和Kafka州立商店的概念,但是在决定如何解决这一问题时遇到了麻烦。我也在使用Spring Cloud Streams,在此之上又增加了另一层次的复杂性。
我的用例:
我有一个规则引擎,可以读取Kafka事件,处理该事件,返回匹配的规则列表并将其写入另一个主题。这是我到目前为止的内容:
@Bean
public Function<KStream<String, ProcessNode>, KStream<String, List<IndicatorEvaluation>>> process() {
return input -> input.mapValues(this::analyze).filter((host, evaluation) -> evaluation != null);
}
public List<IndicatorEvaluation> analyze(final String host, final ProcessNode process) {
// Does stuff
}
一些有状态规则看起来像:
[some condition] REPEATS 5 TIMES WITHIN 1 MINUTE
[some condition] FOLLOWEDBY [some condition] WITHIN 1 MINUTE
[rule A exists and rule B exists]
我当前的实现方式是将所有这些信息存储在内存中,以便能够执行分析。由于明显的原因,它不容易扩展。所以我想我会坚持到卡夫卡州立商店。
我不确定执行此操作的最佳方法。我知道有一种创建自定义状态存储的方法,可以提供更高的灵活性。我不确定Kafka DSL是否支持此功能。
仍然是Kafka Streams的新手,不介意听到各种各样的建议。
根据您的描述,我相信仍然可以使用Kafka Streams中的DSL来实现此用例。上面显示的代码不会跟踪任何状态。在拓扑中,您需要通过跟踪规则的计数来添加状态并将其存储在状态存储中。然后,仅在该计数达到阈值时才需要发送输出规则。这是伪代码背后的一般思想。显然,您必须对此进行调整,才能满足用例的特定规范。