我第一次在独立机器上使用 Kafka Connect,本地托管的 kafka 代理和 Neo4j 数据库运行在同一台机器上。我正在尝试使用自定义密码查询将数据摄取到 neo4j 中,但是出现以下错误。->
ERROR [neo4j-sink|task-0] WorkerSinkTask{id=neo4j-sink-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:212)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:632)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:350)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:250)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:219)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:237)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.executeFailed(RetryWithToleranceOperator.java:116)
at org.apache.kafka.connect.runtime.errors.WorkerErrantRecordReporter.report(WorkerErrantRecordReporter.java:109)
at org.neo4j.connectors.kafka.sink.Neo4jSinkTask.processMessages(Neo4jSinkTask.kt:73)
at org.neo4j.connectors.kafka.sink.Neo4jSinkTask.put(Neo4jSinkTask.kt:49)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:601)
... 11 more
Caused by: org.neo4j.driver.exceptions.ClientException: Query cannot conclude with CALL (must be a RETURN clause, a FINISH clause, an update clause, a unit subquery call, or a procedure call with no YIELD). (line 1, column 165 (offset: 164))
"UNWIND $events AS message WITH message.value AS event, message.timestamp AS __timestamp, message.header AS __header, message.key AS __key, message.value AS __value CALL {WITH * CREATE (n:Test {timestamp: __value.timestamp}) RETURN n}"
我的 kafka connect 配置文件只是一个测试文件,其中包含以下内容 ->
name=neo4j-sink
connector.class=org.neo4j.connectors.kafka.sink.Neo4jConnector
#connector.class=streams.kafka.connect.sink.Neo4jSinkConnector
tasks.max=1
# Kafka configuration
bootstrap.servers=localhost:9092
key.converter=io.confluent.connect.avro.AvroConverter
value.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter.schema.registry.url=http://localhost:8081
errors.tolerance=none
errors.log.enable=true
errors.log.include.messages=true
# Topic configuration
topics=<topic_name>
# Neo4j connection
neo4j.uri=bolt://localhost:7687
neo4j.authentication.basic.username=<usename>
neo4j.authentication.basic.password=<pass>
# Simplified Cypher query that just creates a Check node for each message
neo4j.cypher.topic.<topic_name>0=CREATE (n:Test {timestamp: __value.timestamp}) RETURN n
有人可以帮助我解释为什么我的查询被 CALL 语句包围吗?它如何期望我给予同样的回报。还是我错过了什么?
这个文件只是一个测试文件,我希望为每条消息形成一个 TEST 类型的新节点。我原来的密码查询非常复杂,还涉及多个嵌套调用。
请参阅 Kafka 连接器的 Cypher 策略 的文档。它显示了一个包含 3 个主题的示例,其中之一是
creates
。文档底部是连接器生成的代码示例,用于执行文档中之前为 creates
主题配置的 Cypher 代码 - 它也以 UNWIND $events AS message
开头。
您的配置文件正在定义主题
<topic_name>
,这似乎是错误的。另外,您的配置文件定义了主题 <topic_name>0
的 Cypher 代码(注意末尾额外的“0”),这似乎更错误。
确保仔细遵循文档。