Kafka-connect 无法发送无模式 JSON

问题描述 投票:0回答:1

我有一个 Zabbix-webhook 服务,它以 JSON 格式将数据发送到 kafka:

  "value": {
  "name": "Zabbix server: Trend write cache, % used",
  "key": "zabbix[wcache,trend,pused]",
  "host": "Zabbix server",
  "groups": [
    "Zabbix servers"
  ],
  "applications": [],
  "itemid": 23276,
  "clock": 1670102276,
  "ns": 427825202,
  "value": 0.129509
}

我需要通过kafka-connect的JdbcSinkConnector将这些数据发送到postgres:

curl -X POST "http://localhost:8082/connectors" -H "Content-Type: application/json" -d '{ 
    "name": "zabbix-sink-connector", 
    "config": { 
      "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", 
      "tasks.max": 1,
      "topics": "zabbix-webhook", 
      "connection.url": "jdbc:postgresql://<host>/etl?currentSchema=test", 
      "connection.user": "<username>", 
      "connection.password": "<password>", 
      "auto.create": "true"
    }
  }'

但是当我尝试执行这个curl时,我收到一个错误:

    ERROR WorkerSinkTask{id=zabbix-sink-connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: Sink connector 'zabbix-sink-connector' is configured with 'delete.enabled=false' and 'pk.mode=none' and therefore requires records with a non-null Struct value and non-null Struct schema, but found record at (topic='zabbix-webhook',partition=1,offset=566370,timestamp=1670104965261) with a HashMap value and null value schema. (org.apache.kafka.connect.runtime.WorkerSinkTask)
    org.apache.kafka.connect.errors.ConnectException: Sink connector 'zabbix-sink-connector' is configured with 'delete.enabled=false' and 'pk.mode=none' and therefore requires records with a non-null Struct value and non-null Struct schema, but found record at (topic='zabbix-webhook',partition=1,offset=566370,timestamp=1670104965261) with a HashMap value and null value schema 

根据这篇文章https://www.confluence.io/blog/kafka-connect-deep-dive-converters-serialization-explained/#applying-schema我将变量CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE更改为false,但我仍然得到同样的错误。

我应该如何发送这些 JSON 数据?

apache-kafka apache-kafka-connect
1个回答
3
投票

JDBC Sink 连接器需要一个架构。对于 JSONConverter,如果没有

value.converter.schemas.enable=true
,它就无法接受 JSON。您无法禁用它。

更具体地说,JDBC 连接器需要一个架构,因为它无法保证/提取纯 JSON 中的已知键名称,或者知道它们将具有一致的值类型。它需要该信息来创建 SQL 语句(创建/更改表 DML,或插入/更新/删除 DDL)。

由于您无法控制 Kafka 生产者,为了获取 schema 数据,您可以使用 ksqlDB 将消费 JSON 数据转换为 JSONSchema 或其他二进制结构化格式,然后您可以使用 Connect 写入数据库

© www.soinside.com 2019 - 2024. All rights reserved.