我有一个 Zabbix-webhook 服务,它以 JSON 格式将数据发送到 kafka:
"value": {
"name": "Zabbix server: Trend write cache, % used",
"key": "zabbix[wcache,trend,pused]",
"host": "Zabbix server",
"groups": [
"Zabbix servers"
],
"applications": [],
"itemid": 23276,
"clock": 1670102276,
"ns": 427825202,
"value": 0.129509
}
我需要通过kafka-connect的JdbcSinkConnector将这些数据发送到postgres:
curl -X POST "http://localhost:8082/connectors" -H "Content-Type: application/json" -d '{
"name": "zabbix-sink-connector",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": 1,
"topics": "zabbix-webhook",
"connection.url": "jdbc:postgresql://<host>/etl?currentSchema=test",
"connection.user": "<username>",
"connection.password": "<password>",
"auto.create": "true"
}
}'
但是当我尝试执行这个curl时,我收到一个错误:
ERROR WorkerSinkTask{id=zabbix-sink-connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: Sink connector 'zabbix-sink-connector' is configured with 'delete.enabled=false' and 'pk.mode=none' and therefore requires records with a non-null Struct value and non-null Struct schema, but found record at (topic='zabbix-webhook',partition=1,offset=566370,timestamp=1670104965261) with a HashMap value and null value schema. (org.apache.kafka.connect.runtime.WorkerSinkTask)
org.apache.kafka.connect.errors.ConnectException: Sink connector 'zabbix-sink-connector' is configured with 'delete.enabled=false' and 'pk.mode=none' and therefore requires records with a non-null Struct value and non-null Struct schema, but found record at (topic='zabbix-webhook',partition=1,offset=566370,timestamp=1670104965261) with a HashMap value and null value schema
根据这篇文章https://www.confluence.io/blog/kafka-connect-deep-dive-converters-serialization-explained/#applying-schema我将变量CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE更改为false,但我仍然得到同样的错误。
我应该如何发送这些 JSON 数据?
JDBC Sink 连接器需要一个架构。对于 JSONConverter,如果没有
value.converter.schemas.enable=true
,它就无法接受 JSON。您无法禁用它。
更具体地说,JDBC 连接器需要一个架构,因为它无法保证/提取纯 JSON 中的已知键名称,或者知道它们将具有一致的值类型。它需要该信息来创建 SQL 语句(创建/更改表 DML,或插入/更新/删除 DDL)。
由于您无法控制 Kafka 生产者,为了获取 schema 数据,您可以使用 ksqlDB 将消费 JSON 数据转换为 JSONSchema 或其他二进制结构化格式,然后您可以使用 Connect 写入数据库