我想将afro转换器与kafka连接jdbc接收器连接器。
这些是我的avro配置:
"key.converter":"io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url" : "http://myurl.com" ,
"value.converter":"io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url" : "http://myurl.com" ,
模式设置为false
key.converter.schemas.enable=false
value.converter.schemas.enable=false
现在,当我启动连接器时,出现此错误
原因:org.apache.kafka.connect.errors.ConnectException:值模式必须为Struct类型
根据我的读物,Struct用于json模式,对吗?如果我使用的是Avro模式,我应该没有任何结构?
Avro模式类型是:记录,枚举,数组,映射,联合和固定的,但没有结构。
我想念什么?
谢谢!
Avro 记录创建Struct
Connect数据类型。
错误是您的数据不是记录。
模式设置为false
这些属性对Avro转换器没有任何意义。 Avro 总是具有架构
我想将afro转换器与kafka连接jdbc接收器连接器。
然后,生产者需要发送带有模式的记录。这包括具有模式enabled
的Avro记录或JSON是的,我实际上在一个主题中有Avro记录,等待连接器选择它们。但是我不确定我为什么会收到此错误?如果avro记录创建了结构连接数据类型,为什么我的连接器说它需要结构?
事实上,我仔细检查后发现我的kafka附加程序未发布好的架构。
这是已推送的内容
{"subject":"npl01-pub-wfm-wom_nrt_central_logging-t-uat-
value","version":2,"magic":1,"keytype":"SCHEMA"}-{"subject":"npl01-pub-wfm-
wom_nrt_central_logging-t-uat-
value","version":2,"id":203,"schema":"\"bytes\"","deleted":false}
由于我的模式为:,我不知道为什么要推送“字节”:>
{ "type":"record", "name":"CentralLoggingValue", "namespace":"com.bell.kafka.wot.model", "fields": [ {"name":"id","type":"string"}, {"name":"eventtime_est","type":"string"}, {"name":"businesskey","type":["null", "string"]}, {"name":"systemname","type":"string"}, {"name":"hostname","type":"string"}, {"name":"eventname","type":"string"}, {"name":"log_level","type":"string"}, {"name":"xmlpayload","type":["null", "string"]}, {"name":"xmlresponse","type":["null", "string"]}, {"name":"wait_time_ms","type":["null", "string"]}, {"name":"async_correlationid","type":["null", "string"]}, {"name":"resourceskey","type":["null", "string"]}, {"name":"genericpayload","type":["null", "string"]}, {"name":"genericresponse","type":["null", "string"]} ] }
这是我的lof4j2 kafka附加程序:
<Kafka name="CentralLoggingKafkaAppender" topic="npl01-pub-wfm- wom_nrt_central_logging-t-uat"> <PatternLayout pattern="%m"/> <Property name="bootstrap.servers">myserver:9092</Property> <Property name = "value.serializer">io.confluent.kafka.serializers.KafkaAvroSerializer</Property> <Property name = "schema.registry.url">http://myurl:port</Property> </Kafka>
您知道发生了什么吗?