[kafka connect jdbc sink值模式必须为Struct类型

问题描述 投票:0回答:3

我想将afro转换器与kafka连接jdbc接收器连接器。

这些是我的avro配置:

"key.converter":"io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url" : "http://myurl.com" ,
"value.converter":"io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url" : "http://myurl.com" ,

模式设置为false

key.converter.schemas.enable=false
value.converter.schemas.enable=false

现在,当我启动连接器时,出现此错误

原因:org.apache.kafka.connect.errors.ConnectException:值模式必须为Struct类型

根据我的读物,Struct用于json模式,对吗?如果我使用的是Avro模式,我应该没有任何结构?

Avro模式类型是:记录,枚举,数组,映射,联合和固定的,但没有结构。

我想念什么?

谢谢!

jdbc apache-kafka avro apache-kafka-connect
3个回答
1
投票

Avro 记录创建Struct Connect数据类型。

错误是您的数据不是记录。

模式设置为false

这些属性对Avro转换器没有任何意义。 Avro 总是具有架构

我想将afro转换器与kafka连接jdbc接收器连接器。

然后,生产者需要发送带有模式的记录。这包括具有模式enabled

的Avro记录或JSON

0
投票

是的,我实际上在一个主题中有Avro记录,等待连接器选择它们。但是我不确定我为什么会收到此错误?如果avro记录创建了结构连接数据类型,为什么我的连接器说它需要结构?


0
投票

事实上,我仔细检查后发现我的kafka附加程序未发布好的架构。

这是已推送的内容

{"subject":"npl01-pub-wfm-wom_nrt_central_logging-t-uat- 
value","version":2,"magic":1,"keytype":"SCHEMA"}-{"subject":"npl01-pub-wfm- 
wom_nrt_central_logging-t-uat- 
value","version":2,"id":203,"schema":"\"bytes\"","deleted":false}

由于我的模式为:,我不知道为什么要推送“字节”:>

{
"type":"record",
"name":"CentralLoggingValue",
"namespace":"com.bell.kafka.wot.model",
"fields":
[
    {"name":"id","type":"string"},
    {"name":"eventtime_est","type":"string"},                    
    {"name":"businesskey","type":["null", "string"]},
    {"name":"systemname","type":"string"},
    {"name":"hostname","type":"string"},
    {"name":"eventname","type":"string"},
    {"name":"log_level","type":"string"},
    {"name":"xmlpayload","type":["null", "string"]},
    {"name":"xmlresponse","type":["null", "string"]},
    {"name":"wait_time_ms","type":["null", "string"]},
    {"name":"async_correlationid","type":["null", "string"]},
    {"name":"resourceskey","type":["null", "string"]},
    {"name":"genericpayload","type":["null", "string"]},
    {"name":"genericresponse","type":["null", "string"]}
]
}

这是我的lof4j2 kafka附加程序:

 <Kafka name="CentralLoggingKafkaAppender" topic="npl01-pub-wfm- wom_nrt_central_logging-t-uat">
<PatternLayout pattern="%m"/>
<Property name="bootstrap.servers">myserver:9092</Property>
<Property name = "value.serializer">io.confluent.kafka.serializers.KafkaAvroSerializer</Property>
<Property name = "schema.registry.url">http://myurl:port</Property> 
</Kafka>

您知道发生了什么吗?

© www.soinside.com 2019 - 2024. All rights reserved.