Neo4j 似乎围绕 cypher 命令包装了 CALL 查询

问题描述 投票:0回答:1

我第一次在独立机器上使用 Kafka Connect,本地托管的 kafka 代理和 Neo4j 数据库运行在同一台机器上。我正在尝试使用自定义密码查询将数据摄取到 neo4j 中,但是出现以下错误。->

ERROR [neo4j-sink|task-0] WorkerSinkTask{id=neo4j-sink-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:212)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:632)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:350)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:250)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:219)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
        at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:237)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.executeFailed(RetryWithToleranceOperator.java:116)
        at org.apache.kafka.connect.runtime.errors.WorkerErrantRecordReporter.report(WorkerErrantRecordReporter.java:109)
        at org.neo4j.connectors.kafka.sink.Neo4jSinkTask.processMessages(Neo4jSinkTask.kt:73)
        at org.neo4j.connectors.kafka.sink.Neo4jSinkTask.put(Neo4jSinkTask.kt:49)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:601)
        ... 11 more
Caused by: org.neo4j.driver.exceptions.ClientException: Query cannot conclude with CALL (must be a RETURN clause, a FINISH clause, an update clause, a unit subquery call, or a procedure call with no YIELD). (line 1, column 165 (offset: 164))
"UNWIND $events AS message WITH message.value AS event, message.timestamp AS __timestamp, message.header AS __header, message.key AS __key, message.value AS __value CALL {WITH * CREATE (n:Test {timestamp: __value.timestamp}) RETURN n}"

我的 kafka connect 配置文件只是一个测试文件,其中包含以下内容 ->

name=neo4j-sink                                                                 
connector.class=org.neo4j.connectors.kafka.sink.Neo4jConnector                  
#connector.class=streams.kafka.connect.sink.Neo4jSinkConnector                  
tasks.max=1                                                                     
                                                                                
# Kafka configuration                                                           
bootstrap.servers=localhost:9092                                                
key.converter=io.confluent.connect.avro.AvroConverter                           
value.converter=io.confluent.connect.avro.AvroConverter                         
key.converter.schema.registry.url=http://localhost:8081                         
value.converter.schema.registry.url=http://localhost:8081                       
                                                                                
errors.tolerance=none                                                           
errors.log.enable=true                                                          
errors.log.include.messages=true                                                
                                                                                
# Topic configuration                                                           
topics=<topic_name>
# Neo4j connection                                                              
neo4j.uri=bolt://localhost:7687                                                 
neo4j.authentication.basic.username=<usename>                                       
neo4j.authentication.basic.password=<pass>                                    
                                                                                
# Simplified Cypher query that just creates a Check node for each message       
neo4j.cypher.topic.<topic_name>0=CREATE (n:Test {timestamp: __value.timestamp}) RETURN n

有人可以帮助我解释为什么我的查询被 CALL 语句包围吗?它如何期望我给予同样的回报。还是我错过了什么?

这个文件只是一个测试文件,我希望为每条消息形成一个 TEST 类型的新节点。我原来的密码查询非常复杂,还涉及多个嵌套调用。

apache-kafka neo4j apache-kafka-connect
1个回答
0
投票

请参阅 Kafka 连接器的 Cypher 策略 的文档。它显示了一个包含 3 个主题的示例,其中之一是

creates
。文档底部是连接器生成的代码示例,用于执行文档中之前为
creates
主题配置的 Cypher 代码 - 它也以
UNWIND $events AS message
开头。

您的配置文件正在定义主题

<topic_name>
,这似乎是错误的。另外,您的配置文件定义了主题
<topic_name>0
的 Cypher 代码(注意末尾额外的“0”),这似乎更错误。

确保仔细遵循文档。

© www.soinside.com 2019 - 2024. All rights reserved.