已弃用 KStreams TransformerSupplier 至 ProcessorSupplier

问题描述 投票:0回答:1

鉴于 flatTransform 已弃用,我正在尝试按照建议将其替换为 process

我以前的TransformerSupplier看起来像这样:

public class MyTransformerSupplier implements TransformerSupplier<A, B, Iterable<C, D>> {

  @Override
  public Transformer<A, B, Iterable<C, D>> get() {
    return new MyTransformer();
  }

  @Override
  public Set<StoreBuilder<?>> stores() {
    // Declare a state store
    return Set.of(someStore);
  }
}

我实际的 ProcessorSupplier 看起来像这样:

public class MyProcessorSupplier implements ProcessorSupplier<A, B, C, D> {

  @Override
  public Processor<A, B, C, D> get() {
    return new MyProcessor();
  }

  @Override
  public Set<StoreBuilder<?>> stores() {
    // Unmodified declaration of the state store
    return Set.of(someStore);
  }
}

我之前的 Transformer 和新的 ContextualProcessor 之间唯一显着的区别是,不是返回列表,而是在内部执行对

context().forward(...)
的多次调用。

错误恰恰在于

context()
对于我的新 ContextualProcessor 为空。
你能帮我理解为什么吗?

java apache-kafka apache-kafka-streams
1个回答
0
投票

您应该重写方法

init()
,在那里您可以初始化一个由 Kafka 流自动提供的变量上下文。

在 Scala 中,你可以做这样的事情

private var context: ProcessorContext = _ 

override def init(context: ProcessorContext): Unit = {
   this.context = context 
}

然后您可以使用该变量转发记录。

© www.soinside.com 2019 - 2024. All rights reserved.