我们正在尝试在我们的项目中使用 kafka 流来从一个主题读取数据并写入另一个主题,并且我们有一个使用 KafkaHeaders 作为过滤某些记录的机制的用例。
例如,在输入主题中,您获取属于某个学校的所有学生的数据。现在在输出主题中,您只需要有关基于班级的一部分学生的信息。
记录:
student_name | student_id | student_class
最初我们认为我们会使用学生对象来做到这一点,但这意味着我们需要反序列化该对象,然后进行过滤。相反,我们想要做的是为每条记录传递标头。此标题将包含学生的班级信息。
标题:
class: v
我们想知道是否有办法使用 kafka 流来做到这一点。我们以为我们可以在过滤器函数中使用标头,但是过滤器函数没有标头信息。
kstreams.filter((k,v) -> {
howToAccessHeaders?
})
我们也尝试过使用process函数,但是同样不清楚如何使用那里的记录来过滤。
kstream.process(new CustomerProcessor())
自定义处理器:
class CustomProcessor implements Processor<String, byte[], String, byte[]> {
@Override
public void process(Record<String, byte[]> record) {
if(record.headers().lastHeader("class").value().toString() == "v"){
//Without return value, how does the record gets filtered?
}
}
}
我们还可以做些什么来使用标题过滤记录吗?或者使用 kafkaStreams 不可能吗?
PS:我们尝试使用 tranform 和 TransformValues 函数,但它们现在已被弃用。
编写自定义
Processor
是正确的方法。要向下游转发记录,即您要保留的记录,您可以使用 ProcessorContext#forward(...)
。相应的 ProcessorContext
对象通过您需要覆盖的 Processor
方法传递到 Processor#init(...)
中。 -- 如果您不对输入记录调用 forward()
,它就会掉在地板上并被过滤掉。