我正在尝试用Kafka HDFS Sink编写JSON。
我有以下属性(connect-standalone.properties):
key.converter.schemas.enable = false
value.converter.schemas.enable = false
schemas.enable=false
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
在我的属性上:
format.class=io.confluent.connect.hdfs.json.JsonFormat
我得到以下例外:
org.apache.kafka.connect.errors.DataException:由于存在锯齿错误,将byte []转换为Kafka连接失败
...引起:org.apache.kafka.commom.errors.SerlizationException:com.fasterxml.jackson.core.JsonParseException:无法识别的标记'test':期待'null','true','false'或NaN at [来源:( byte [])“test”行:1栏:11]
我的JSON是有效的。
我该怎么解决呢?
*我也尝试使用样本JSON,如:
{"key":"value"}
还是一样的错误。
谢谢。
根据错误,并非主题中的所有消息都是JSON对象。最新消息可能有效,或者Kafka值可能有效(但不是密钥),但错误显示它尝试读取无效的字符串(byte[])"test"
,该字符串无效
如果您只想将文本数据导入HDFS,则可以使用String格式,但不会有Hive集成
format.class=io.confluent.connect.hdfs.string.StringFormat
如果你确实想要使用这种格式的Hive,你需要自己定义JSON Serde