如何解决Kafka Connect JSONConverter“模式必须包含'type'字段”

问题描述 投票:0回答:1

正在尝试将消息推送到JdbcSink消息如下

{
  "schema": {
    "type": "struct",
      "fields": [{
        "field": "ID",
        "type": {
          "type": "bytes",
          "scale": 0,
          "precision": 64,
          "connect.version": 1,
          "connect.parameters": {
            "scale": "0"
          },
          "connect.name": "org.apache.kafka.connect.data.Decimal",
          "logicalType": "decimal"
        }
      }, {
        "field": "STORE_DATE",
        "type": ["null", {
          "type": "long",
          "connect.version": 1,
          "connect.name": "org.apache.kafka.connect.data.Timestamp",
          "logicalType": "timestamp-millis"
        }],
        "default": null
      }, {
        "field": "DATA",
        "type": ["null", "string"],
        "default": null
      }],
        "name": "KAFKA_STREAM"
  },
    "payload": {
      "ID": 17,
        "STORE_DATE": null,
          "DATA": "THIS IS TEST DATA"
    }
}

但是它总是抛出错误Caused by: org.apache.kafka.connect.errors.DataException: Schema must contain 'type' field

这是当前正在使用的连接器配置

{
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "topics": "DEV_KAFKA_STREAM",
    "connection.url": "url",
    "connection.user": "user",
    "connection.password": "password",
    "insert.mode": "insert",
    "table.name.format": "KAFKA_STREAM",
    "pk.fields": "ID",
    "auto.create": "false",
    "errors.log.enable": "true",
    "errors.log.include.messages": "true",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "true"
}

不知道如何调试此问题或如何找到根本原因,因为json确实具有type字段

jdbc apache-kafka apache-kafka-connect confluent
1个回答
0
投票

据我所知,"long"不是有效的架构类型。

您想要"int64"

JSON Schema source code

而且您可能还想删除联合。有一个optional键来指定可为空的字段

Kafka Connect JDBC sink connector not working

如果您正在用Java创建该JSON,则应在两个JSONNode对象周围使用SchemaBuilder和Envelope类,以确保您正确构建了有效负载

© www.soinside.com 2019 - 2024. All rights reserved.