Cypher 策略
此策略对接收到的每条消息执行相应的 Cypher 语句。
要为所需主题配置 Cypher 策略,您必须遵循以下约定
"neo4j.cypher.topic.<YOUR_TOPIC>": "<YOUR_CYPHER_QUERY>"
从 Neo4j Kafka 连接器 5.1.0 版本开始,Cypher 策略将消息的标题、键和值分别绑定为 为了向后兼容, |
示例
假设您在接收连接器配置设置中配置了接收连接器订阅的主题,如下所示;
"topics": "creates,updates,deletes"
您需要声明要使用 cypher
策略,并为每个主题提供相应的 Cypher 语句,类似于以下内容;
"topics": "creates,updates,deletes",
"neo4j.cypher.topic.creates": "WITH __value.event.state.after AS state MERGE (p:Person {name: state.properties.name, surname: state.properties.surname}) MERGE (f:Family {name: state.properties.surname}) MERGE (p)-[:BELONGS_TO]->(f)",
"neo4j.cypher.topic.updates": "WITH __value.event.state.before AS before, __value.event.state.after AS after MATCH (p:Person {name: before.properties.name, surname: before.properties.surname}) MATCH (fPre:Family {name: before.properties.surname}) OPTIONAL MATCH (p)-[b:BELONGS_TO]->(fPre) DELETE b WITH after, p SET p.name = after.properties.name, p.surname = after.properties.surname MERGE (f:Family {name: after.properties.surname}) MERGE (p)-[:BELONGS_TO]->(f)",
"neo4j.cypher.topic.deletes": "WITH __value.event.state.before AS before MATCH (p:Person {name: before.properties.name, surname: before.properties.surname}) DETACH DELETE p"
上面的配置摘录定义了;
-
从
creates
主题接收的消息将由接收连接器使用以下 Cypher 查询解包到 Neo4j 中WITH __value.event.state.after AS state MERGE (p:Person {name: state.properties.name, surname: state.properties.surname}) MERGE (f:Family {name: state.properties.surname}) MERGE (p)-[:BELONGS_TO]->(f)
-
从
updates
主题接收的消息将由接收连接器使用以下 Cypher 查询解包到 Neo4j 中WITH __value.event.state.before AS before, __value.event.state.after AS after MATCH (p:Person {name: before.properties.name, surname: before.properties.surname}) MATCH (fPre:Family {name: before.properties.surname}) OPTIONAL MATCH (p)-[b:BELONGS_TO]->(fPre) DELETE b WITH after, p SET p.name = after.properties.name, p.surname = after.properties.surname MERGE (f:Family {name: after.properties.surname}) MERGE (p)-[:BELONGS_TO]->(f)
-
从
deletes
主题接收的消息将由接收连接器使用以下 Cypher 查询解包到 Neo4j 中WITH __value.event.state.before AS before MATCH (p:Person {name: before.properties.name, surname: before.properties.surname}) DETACH DELETE p
创建接收实例
根据上面的示例,您可以使用以下配置之一。选择消息序列化格式示例之一,并将其另存为名为 sink.cypher.neo4j.json
的文件到本地目录中。
使用此 REST 调用将配置加载到 Kafka Connect 中
curl -X POST http://localhost:8083/connectors \
-H 'Content-Type:application/json' \
-H 'Accept:application/json' \
-d @sink.cypher.neo4j.json
现在,您可以在 http://localhost:9021/clusters
下的 Confluent Control Center 实例中访问它。验证配置的连接器实例是否在 connect-default
下的 连接
选项卡中运行。
在幕后,连接器将为每个主题创建一个更改批次,并将使用预先附加的
其中 |