Kafka 流使用标头过滤消息

问题描述 投票:0回答:1

我们正在尝试在我们的项目中使用 kafka 流来从一个主题读取数据并写入另一个主题,并且我们有一个使用 KafkaHeaders 作为过滤某些记录的机制的用例。

例如,在输入主题中,您获取属于某个学校的所有学生的数据。现在在输出主题中,您只需要有关基于班级的一部分学生的信息。

记录:

student_name | student_id | student_class

最初我们认为我们会使用学生对象来做到这一点,但这意味着我们需要反序列化该对象,然后进行过滤。相反,我们想要做的是为每条记录传递标头。此标题将包含学生的班级信息。

标题:

class: v

我们想知道是否有办法使用 kafka 流来做到这一点。我们以为我们可以在过滤器函数中使用标头,但是过滤器函数没有标头信息。

    kstreams.filter((k,v) -> {
       howToAccessHeaders?
})

我们也尝试过使用process函数,但是同样不清楚如何使用那里的记录来过滤。

kstream.process(new CustomerProcessor())

自定义处理器:

class CustomProcessor implements Processor<String, byte[], String, byte[]> {

    @Override
    public void process(Record<String, byte[]> record) {
        if(record.headers().lastHeader("class").value().toString() == "v"){
            //Without return value, how does the record gets filtered? 
        }
    }
}

我们还可以做些什么来使用标题过滤记录吗?或者使用 kafkaStreams 不可能吗?

PS:我们尝试使用 tranform 和 TransformValues 函数,但它们现在已被弃用。

java apache-kafka apache-kafka-streams
1个回答
0
投票

编写自定义

Processor
是正确的方法。要向下游转发记录,即您要保留的记录,您可以使用
ProcessorContext#forward(...)
。相应的
ProcessorContext
对象通过您需要覆盖的
Processor
方法传递到
Processor#init(...)
中。 -- 如果您不对输入记录调用
forward()
,它就会掉在地板上并被过滤掉。

参考https://docs.confluence.io/platform/current/streams/developer-guide/processor-api.html#accessing-processor-context

© www.soinside.com 2019 - 2024. All rights reserved.