我正在学习Kafka处理器API,并在ProcessorContext
中找到一个方法标头。
headers()
返回当前输入记录的标题;可能如果不可用,则返回null
此方法的用途是什么?
在docs中只写了一行:
返回当前输入记录的标题;如果它可以为null不可用
我可以对此执行一些操作,例如添加吗?
标头是可以添加到每条消息的某种元数据。标头可用于各种情况,包括包含可在过滤记录等时使用的信息。
您可以通过处理器API,更确切地说是process()
,transform()
和transformValues()
访问消息的元数据。对于example,为了将标题添加到记录,将执行以下操作:
public void process(String key, String value) {
// add a header to the elements
context().headers().add.("key", "value")
}