如何使用NiFi在消息的元数据部分添加Kafka偏移、分区详细信息

问题描述 投票:0回答:1

从 Kafka 读取的实际消息:

{
  "body": {
    "metadata": {
      "id": "bce16e11"
    },
    "eventDetails": {
      "eventID": "c5f615f1",
      "customerId": "123456789",
      "Name": "NEW"
    }
  }
}

想要向消息中添加Kafka偏移量和分区,如下所示。

{
  "body" : {
    "metadata" : {
      "id" : "bce16e11",
      "kafkaOffset" : 4537732,
      "kafkaPaitition" : 4
    },
    "eventDetails" : {
      "eventID" : "c5f615f1",
      "customerId" : "123456789",
      "Name" : "NEW"
    }
  }
}
json transform apache-nifi jolt
1个回答
0
投票

您可以使用 JoltTransformJSON 处理器以及包含 # 通配符的 shift 规范,以强调固定值,例如:

[
  {
    "operation": "shift",
    "spec": {
      "body": {
        "metadata": {
          "*": "&2.&1.&", //represents all of the existing elements within the "metadata" object
                          //&1 stands for "metadata" key
                          //&2 stands for "body" key
          "#4537732": "&2.&1.kafkaOffset",
          "#4": "&2.&1.kafkaPaitition"
        },
        "*": "&1.&" //represents the elements other than "metadata" object
          //&1 stands for "body" key
      }
    }
  },
  {
    "operation": "modify-overwrite-beta",
    "spec": {
      "body": {
        "metadata": {
          "*": "=toInteger"
        }
      }
    }
  }
]

或者直接使用修改规范:

[
  {
    "operation": "modify-overwrite-beta",
    "spec": {
      "body": {
        "metadata": {
          "kafkaOffset": "=@(4537732)",
          "kafkaPaitition": "=@(4)"
        }
      }
    }
  }
]
© www.soinside.com 2019 - 2024. All rights reserved.