我正在学习 Kafka Processor API,并在
ProcessorContext
中找到一个方法头。
headers()
返回当前输入记录的表头;可能 如果不可用则为 null
这个方法有什么用?
在docs中只写了一行:
返回当前输入记录的表头;如果它可以为空 不可用
我可以对此执行一些操作,例如添加吗?
标头是某种可以附加到每条消息的元数据。标头可用于各种场景,例如附加可在过滤记录时使用的信息等。
您可以通过处理器 API 访问消息的元数据,更准确地说是
process()
、transform()
和 transformValues()
。对于示例,为了向记录添加标题,可以使用以下方法:
public void process(String key, String value) {
// add a header to the elements
context().headers().add.("key", "value")
}
标记为已接受的答案并不完全正确。
标题实际上已添加到记录中,但记录并未发送回主题。
为此我们需要使用forward方法:
public void process(Record<RecordKey, RecordValue> record) {
// add a header to the elements
record.headers().add("key", "value".getBytes());
// forward the record
context.forward(record);
}