Kafka Processor API 中 Header 有什么用?

问题描述 投票:0回答:2

我正在学习 Kafka Processor API,并在

ProcessorContext
中找到一个方法头。

headers​()

返回当前输入记录的表头;可能 如果不可用则为 null

这个方法有什么用?

docs中只写了一行:

返回当前输入记录的表头;如果它可以为空 不可用

我可以对此执行一些操作,例如添加吗?

apache-kafka apache-kafka-streams kafka-producer-api
2个回答
3
投票

标头是某种可以附加到每条消息的元数据。标头可用于各种场景,例如附加可在过滤记录时使用的信息等。


您可以通过处理器 API 访问消息的元数据,更准确地说是

process()
transform()
transformValues()
。对于示例,为了向记录添加标题,可以使用以下方法:

public void process(String key, String value) {

    // add a header to the elements
    context().headers().add.("key", "value")
}

0
投票

标记为已接受的答案并不完全正确。

标题实际上已添加到记录中,但记录并未发送回主题。

为此我们需要使用forward方法:

public void process(Record<RecordKey, RecordValue> record) {

        // add a header to the elements
        record.headers().add("key", "value".getBytes());

        // forward the record
        context.forward(record);
}
© www.soinside.com 2019 - 2024. All rights reserved.