Cypher 策略
此策略为每条收到的消息执行相应的 Cypher 语句。
要为所需主题配置 Cypher 策略,您必须遵循以下约定
"neo4j.cypher.topic.<YOUR_TOPIC>": "<YOUR_CYPHER_QUERY>"
从 Neo4j Kafka 连接器的 5.1.0 版本开始,Cypher 策略将消息的标头、键和值分别绑定为 为了向后兼容, |
示例
假设您在接收器配置设置中订阅了以下主题:
"topics": "creates,updates,deletes"
您需要声明您要使用 cypher
策略,并为每个主题提供相应的 Cypher 语句,如下所示:
"topics": "creates,updates,deletes",
"neo4j.cypher.topic.creates": "WITH __value.event.state.after AS state MERGE (p:Person {name: state.properties.name, surname: state.properties.surname}) MERGE (f:Family {name: state.properties.surname}) MERGE (p)-[:BELONGS_TO]->(f)",
"neo4j.cypher.topic.updates": "WITH __value.event.state.before AS before, __value.event.state.after AS after MATCH (p:Person {name: before.properties.name, surname: before.properties.surname}) MATCH (fPre:Family {name: before.properties.surname}) OPTIONAL MATCH (p)-[b:BELONGS_TO]->(fPre) DELETE b WITH after, p SET p.name = after.properties.name, p.surname = after.properties.surname MERGE (f:Family {name: after.properties.surname}) MERGE (p)-[:BELONGS_TO]->(f)",
"neo4j.cypher.topic.deletes": "WITH __value.event.state.before AS before MATCH (p:Person {name: before.properties.name, surname: before.properties.surname}) DETACH DELETE p"
上述配置摘录定义了:
-
从
creates
主题接收到的消息将由接收器连接器通过以下 Cypher 查询解包到 Neo4j 中WITH __value.event.state.after AS state MERGE (p:Person {name: state.properties.name, surname: state.properties.surname}) MERGE (f:Family {name: state.properties.surname}) MERGE (p)-[:BELONGS_TO]->(f)
-
从
updates
主题接收到的消息将由接收器连接器通过以下 Cypher 查询解包到 Neo4j 中WITH __value.event.state.before AS before, __value.event.state.after AS after MATCH (p:Person {name: before.properties.name, surname: before.properties.surname}) MATCH (fPre:Family {name: before.properties.surname}) OPTIONAL MATCH (p)-[b:BELONGS_TO]->(fPre) DELETE b WITH after, p SET p.name = after.properties.name, p.surname = after.properties.surname MERGE (f:Family {name: after.properties.surname}) MERGE (p)-[:BELONGS_TO]->(f)
-
从
deletes
主题接收到的消息将由接收器连接器通过以下 Cypher 查询解包到 Neo4j 中WITH __value.event.state.before AS before MATCH (p:Person {name: before.properties.name, surname: before.properties.surname}) DETACH DELETE p
创建接收器实例
基于以上示例,您可以使用以下配置之一。选择其中一个消息序列化格式示例,并将其保存为名为 sink.cypher.neo4j.json
的文件到本地目录中。
使用此 REST 调用将配置加载到 Kafka Connect 中
curl -X POST http://localhost:8083/connectors \
-H 'Content-Type:application/json' \
-H 'Accept:application/json' \
-d @sink.cypher.neo4j.json
现在您可以通过 http://localhost:9021/clusters
访问您的 Confluent Control Center 实例。在 connect-default
下的 Connect
选项卡中验证已配置的连接器实例是否正在运行。
在底层,连接器将为每个主题创建一批更改,并使用预置的
其中 |