我在 Neo4j 实例中启用了 CDC,并设置了一个 Kafka 主题来监听 Neo4j 数据库中的数据更改。根据 Neo4j 文档,事件数据应该采用键值对格式,但我收到一个更复杂的结构,每个属性都有附加字段。
这是我从 Kafka 主题收到的数据示例:
{
"id": "CJUg4WrNW0Y7ttlh8lbkxfwAAAAAAAAEMAAAAAAAAAACAAABkh21o5c=",
"txId": 1072,
"seq": 2,
"event": {
"elementId": "4:9520e16a-cd5b-463b-b6d9-61f256e4c5fc:2073",
"eventType": "NODE",
"operation": "CREATE",
"labels": [
"Environment"
],
"keys": {},
"state": {
"before": null,
"after": {
"labels": [
"Environment"
],
"properties": {
"name": {
"B": null,
"I64": null,
"F64": null,
"S": "Dev",
"BA": null,
"TLD": null,
"TLDT": null,
"TLT": null,
"TZDT": null,
"TOT": null,
"TD": null,
"SP": null,
"LB": null,
"LI64": null,
"LF64": null,
"LS": null,
"LTLD": null,
"LTLDT": null,
"LTLT": null,
"LZDT": null,
"LTOT": null,
"LTD": null,
"LSP": null
},
"id": {
"B": null,
"I64": null,
"F64": null,
"S": "78b90e78-9b79-4330-9d02-7895f349964b",
"BA": null,
"TLD": null,
"TLDT": null,
"TLT": null,
"TZDT": null,
"TOT": null,
"TD": null,
"SP": null,
"LB": null,
"LI64": null,
"LF64": null,
"LS": null,
"LTLD": null,
"LTLDT": null,
"LTLT": null,
"LZDT": null,
"LTOT": null,
"LTD": null,
"LSP": null
}
}
}
}
}
}
我的neo4j服务器版本是5.22,Kafka连接版本是5.1.1
Kafka 连接器配置:
{
"name": "neo4j-source-connector",
"config": {
"connector.class": "org.neo4j.connectors.kafka.source.Neo4jConnector",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"key.converter.schemas.enable": false,
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": false,
"neo4j.uri": "bolt://localhost:7687",
"neo4j.streaming.from": "ALL",
"neo4j.authentication.basic.username": "neo4j",
"neo4j.authentication.basic.password": "password",
"neo4j.source-strategy": "CDC",
"neo4j.start-from": "NOW",
"neo4j.cdc.poll-interval": "500ms",
"neo4j.cdc.poll-duration": "5s",
"neo4j.cdc.topic.neo4j-node-topic.patterns.0.pattern": "(:Environment)",
}
}
请注意,对于属性名称和 id,我看到的不是简单的键值格式,而是带有
B
、I64
、S
等字段的结构。我期望的是这样的:
"id": "78b90e78-9b79-4330-9d02-7895f349964b"
是否有我缺少的配置或其他可以解释为什么我收到这种格式的数据的配置?如何将其简化为键值对?
您所看到的内容已在连接器的 5.1.0-rc02 版本中引入,该版本已被推广为普遍可用(5.1.0 及更高版本)。 简而言之,这可以确保发布到 Kafka 主题的更改事件始终遵循相同的注册消息模式,即使特定属性在 Neo4j 端更改了其类型(例如:
:Environment(id)
从整数更改为字符串)。
因此,所有节点/关系属性都可以是受支持的 Neo4j 类型,因此具有类型缩写和相应属性值的结构体或null
(“B”代表布尔值,“I64”代表 int64/long,“F64”代表float64/double,“S”代表字符串...)
如果您使用源连接器的早期版本尝试此方案,则当对主题消息强制实施架构时(在发布时间或消费时间,具体取决于您的配置),此类属性类型更改将导致错误。