从 Kafka 读取的实际消息:
{
"body": {
"metadata": {
"id": "bce16e11"
},
"eventDetails": {
"eventID": "c5f615f1",
"customerId": "123456789",
"Name": "NEW"
}
}
}
想要向消息中添加Kafka偏移量和分区,如下所示。
{
"body" : {
"metadata" : {
"id" : "bce16e11",
"kafkaOffset" : 4537732,
"kafkaPaitition" : 4
},
"eventDetails" : {
"eventID" : "c5f615f1",
"customerId" : "123456789",
"Name" : "NEW"
}
}
}
您可以使用 JoltTransformJSON 处理器以及包含 # 通配符的 shift 规范,以强调固定值,例如:
[
{
"operation": "shift",
"spec": {
"body": {
"metadata": {
"*": "&2.&1.&", //represents all of the existing elements within the "metadata" object
//&1 stands for "metadata" key
//&2 stands for "body" key
"#4537732": "&2.&1.kafkaOffset",
"#4": "&2.&1.kafkaPaitition"
},
"*": "&1.&" //represents the elements other than "metadata" object
//&1 stands for "body" key
}
}
},
{
"operation": "modify-overwrite-beta",
"spec": {
"body": {
"metadata": {
"*": "=toInteger"
}
}
}
}
]
或者直接使用修改规范:
[
{
"operation": "modify-overwrite-beta",
"spec": {
"body": {
"metadata": {
"kafkaOffset": "=@(4537732)",
"kafkaPaitition": "=@(4)"
}
}
}
}
]