鉴于 flatTransform 已弃用,我正在尝试按照建议将其替换为 process。
我以前的TransformerSupplier看起来像这样:
public class MyTransformerSupplier implements TransformerSupplier<A, B, Iterable<C, D>> {
@Override
public Transformer<A, B, Iterable<C, D>> get() {
return new MyTransformer();
}
@Override
public Set<StoreBuilder<?>> stores() {
// Declare a state store
return Set.of(someStore);
}
}
我实际的 ProcessorSupplier 看起来像这样:
public class MyProcessorSupplier implements ProcessorSupplier<A, B, C, D> {
@Override
public Processor<A, B, C, D> get() {
return new MyProcessor();
}
@Override
public Set<StoreBuilder<?>> stores() {
// Unmodified declaration of the state store
return Set.of(someStore);
}
}
我之前的 Transformer 和新的 ContextualProcessor 之间唯一显着的区别是,不是返回列表,而是在内部执行对
context().forward(...)
的多次调用。
错误恰恰在于
context()
对于我的新 ContextualProcessor 为空。您应该重写方法
init()
,在那里您可以初始化一个由 Kafka 流自动提供的变量上下文。
在 Scala 中,你可以做这样的事情
private var context: ProcessorContext = _
override def init(context: ProcessorContext): Unit = {
this.context = context
}
然后您可以使用该变量转发记录。