我正在使用 Neo4j Source Connector for Kafka (https://neo4j.com/docs/kafka/kafka-connect/source/) 来生成 Kafka 主题的更改。我的配置如下:
{
"connector.class": "streams.kafka.connect.source.Neo4jSourceConnector",
"neo4j.authentication.basic.password": "*****",
"neo4j.server.uri": "bolt://neo4j:7687",
"neo4j.source.query": "MATCH (c:Customer) WHERE c.timestamp > $lastCheck RETURN c.name as name, c.age as age, c.timestamp as timestamp",
"neo4j.enforce.schema": "true",
"name": "neo4j-source-connector",
"neo4j.authentication.basic.username": "neo4j",
"topic": "neo4j-test-AVRO",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"neo4j.streaming.poll.interval.msecs": "5000",
"neo4j.streaming.from": "LAST_COMMITTED"
}
每当我使用以下代码在 Neo4j 中插入节点时:
CREATE (c1:Customer {name: 'Test',age:null, timestamp: timestamp()})
任何帮助将不胜感激。谢谢
我使用了 JSONSchema (io.confluence.connect.json.JsonSchemaConverter) 但遇到了同样的错误。不过,事情可以与字符串转换器(org.apache.kafka.connect.storage.StringConverter)一起使用。
根据您的需求,以下是通过更改“neo4j.source.query”可以实现的一些可能的解决方法:
过滤掉没有
Customer
属性的 age
节点:
MATCH (c:Customer) WHERE c.age IS NOT NULL AND c.timestamp > $lastCheck RETURN c.name AS name, c.age AS age, c.timestamp AS timestamp
使用特殊值(例如-1)而不是 NULL
age
:
MATCH (c:Customer) c.timestamp > $lastCheck RETURN c.name AS 名称,COALESCE(c.age, -1) AS 年龄,c.timestamp AS 时间戳