为一个项目创建了一个 Apache Kafka 集群(目前有 3 个代理、3 个控制器、1 个工作线程),并且有多个主题从第三方接收数据。工作人员正在利用 Confluence Elasticsearch 插件。
我想了解我的配置或我对数据的理解哪里出了问题。我对 Apache Kafka 比较陌生,而且只是一个初出茅庐的开发人员。我想我对技术有很好的掌握,但 Kafka 生态系统让我头晕目眩。
控制台使用者针对 1 个主题的输出示例。所有格式都类似,没有模式。
{
"location": "Battery Tester1",
"dc": "16.20V",
"ac": "12.01V",
"curr": " 0.00A",
"temperature": "32.00C",
"status": [
"Currently on AC power"
]
}
{
"location": "Battery Tester2",
"dc": "16.10V",
"ac": "11.01V",
"curr": " 2.00A",
"temperature": "34.00C",
"status": [
"Currently on AC power"
]
}
{
"location": "Battery Tester3",
"status": [
"Currently on AC power"
]
}
connect-standalone.properties 是:
bootstrap.servers=kafbrk01-4:9092,kafbrk01-5:9092,kafbrk01-6:9092
config.storage.topic: es-connect-kafwrk01-configs
offset.storage.topic: es-connect-kafwrk01-offsets
status.storage.topic: es-connect-kafwrk01-status
config.storage.replication.factor: -1
offset.storage.replication.factor: -1
status.storage.replication.factor: -1
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
plugin.path=/opt/kafka/plugins
使用插件quickstart-elasticsearch.properties
name=elasticsearch-sink
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
topics=Power,Router,Gateway
key.ignore=true
connection.url=https://<FQDN to es01>:9200,https://<FQDN to es02>:9200,https://<FQDN to es03>:9200
connection.username=es_sink_connector_user
connection.password=FakePasswordBecause!
type.name=kafka-connect
elastic.security.protocol = PLAINTEXT
schema.ignore=true
使用此配置运行时,出现以下错误堆栈
[2024-04-24 22:06:15,672] ERROR [elasticsearch-sink|task-0] WorkerSinkTask{id=elasticsearch-sink-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:212)
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:230)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:156)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:533)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:513)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:349)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:250)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:219)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:236)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error:
at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:333)
at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:91)
at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$3(WorkerSinkTask.java:533)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:180)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:214)
... 14 more
Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'key': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
at [Source: (byte[])"key"; line: 1, column: 4]
at org.apache.kafka.connect.json.JsonDeserializer.deserialize(JsonDeserializer.java:69)
at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:331)
... 18 more
Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'key': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
at [Source: (byte[])"key"; line: 1, column: 4]
at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:2391)
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:745)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3635)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2734)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:902)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:794)
at com.fasterxml.jackson.databind.ObjectMapper._readTreeAndClose(ObjectMapper.java:4703)
at com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:3090)
at org.apache.kafka.connect.json.JsonDeserializer.deserialize(JsonDeserializer.java:67)
... 19 more
我最初在构建/测试 kafka 集群时编写了一个小型 python 应用程序,以将条目写入集群,并且接收器与它一起工作。我很困惑,但根据阅读/研究,我觉得这可能是 JSON 格式?我看到的内容对我来说看起来像是有效的 JSON,但错误是否表明它不是有效的,并且控制台使用者按照设计以这种方式向我显示它?
此外,我尝试使用
org.apache.kafka.connect.storage.StringConverter
作为键/值,但这会创建空索引,并且在 6 次重试后出现错误,并引用 org.elasticsearch.common.compress.NotXContentException: Compressor detection can only be called on some xcontent bytes or compressed xcontent bytes
错误。
更新24/4/25 作为练习,我将每个主题的内容转储到一个单独的文件 (> /tmp/) 中,并在每个条目上方插入一个
{"create":{"_index":"<topicname>"}}
。尝试使用curl批量导入elasticsearch失败并出现类似错误。我将每个条目的漂亮 json 布局修改为一行,并成功导入。这有助于表明什么吗?也许它被视为每行的字符串数据而不是实际的 json?
如有任何指导、提示或反馈,我们将不胜感激。 不管怎样,谢谢您的宝贵时间。
经过研究和进一步测试,我发现在我的场景中,通过第三方发送的数据发送时带有空键值,这对我的新手来说是显而易见的。
当我运行带有显示密钥选项的本地控制台消费者时,我才意识到这一点。
/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server kafbrk01-4:9092 --topic Power --property=print.key=true --from-beginning
这显示为
key {
"location": "Battery Tester1",
"dc": "16.20V",
"ac": "12.01V",
"curr": " 0.00A",
"temperature": "32.00C",
"status": [
"Currently on AC power"
]
}
key {
"location": "Battery Tester2",
"dc": "16.10V",
"ac": "11.01V",
"curr": " 2.00A",
"temperature": "34.00C",
"status": [
"Currently on AC power"
]
}
key {
"location": "Battery Tester3",
"status": [
"Currently on AC power"
]
}
考虑到这一点,并希望字符串转换器能够处理 null,我切换了
connect-standalone.properties
配置
key.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
随后,我看到 connect Worker 启动了,它成功处理了每个主题中的数据并推送到 ELK 集群。
希望这对将来的人有帮助。