Cypher 策略
此策略为每条收到的消息执行相应的 Cypher 语句。
要为所需主题配置 Cypher 策略,您必须遵循以下约定
"neo4j.cypher.topic.<YOUR_TOPIC>": "<YOUR_CYPHER_QUERY>"
|
从 Neo4j Kafka 连接器的 5.1.0 版本开始,Cypher 策略将消息的标头、键和值分别绑定为 为了向后兼容, |
示例
假设您在接收器配置设置中订阅了以下主题:
"topics": "creates,updates,deletes"
您需要声明您要使用 cypher 策略,并为每个主题提供相应的 Cypher 语句,如下所示:
"topics": "creates,updates,deletes",
"neo4j.cypher.topic.creates": "WITH __value.event.state.after AS state MERGE (p:Person {name: state.properties.name, surname: state.properties.surname}) MERGE (f:Family {name: state.properties.surname}) MERGE (p)-[:BELONGS_TO]->(f)",
"neo4j.cypher.topic.updates": "WITH __value.event.state.before AS before, __value.event.state.after AS after MATCH (p:Person {name: before.properties.name, surname: before.properties.surname}) MATCH (fPre:Family {name: before.properties.surname}) OPTIONAL MATCH (p)-[b:BELONGS_TO]->(fPre) DELETE b WITH after, p SET p.name = after.properties.name, p.surname = after.properties.surname MERGE (f:Family {name: after.properties.surname}) MERGE (p)-[:BELONGS_TO]->(f)",
"neo4j.cypher.topic.deletes": "WITH __value.event.state.before AS before MATCH (p:Person {name: before.properties.name, surname: before.properties.surname}) DETACH DELETE p"
上述配置摘录定义了:
-
从
creates主题接收到的消息将由接收器连接器通过以下 Cypher 查询解包到 Neo4j 中WITH __value.event.state.after AS state MERGE (p:Person {name: state.properties.name, surname: state.properties.surname}) MERGE (f:Family {name: state.properties.surname}) MERGE (p)-[:BELONGS_TO]->(f) -
从
updates主题接收到的消息将由接收器连接器通过以下 Cypher 查询解包到 Neo4j 中WITH __value.event.state.before AS before, __value.event.state.after AS after MATCH (p:Person {name: before.properties.name, surname: before.properties.surname}) MATCH (fPre:Family {name: before.properties.surname}) OPTIONAL MATCH (p)-[b:BELONGS_TO]->(fPre) DELETE b WITH after, p SET p.name = after.properties.name, p.surname = after.properties.surname MERGE (f:Family {name: after.properties.surname}) MERGE (p)-[:BELONGS_TO]->(f) -
从
deletes主题接收到的消息将由接收器连接器通过以下 Cypher 查询解包到 Neo4j 中WITH __value.event.state.before AS before MATCH (p:Person {name: before.properties.name, surname: before.properties.surname}) DETACH DELETE p
创建接收器实例
基于以上示例,您可以使用以下配置之一。选择其中一个消息序列化格式示例,并将其保存为名为 sink.cypher.neo4j.json 的文件到本地目录中。
使用此 REST 调用将配置加载到 Kafka Connect 中
curl -X POST https://:8083/connectors \
-H 'Content-Type:application/json' \
-H 'Accept:application/json' \
-d @sink.cypher.neo4j.json
现在您可以通过 https://:9021/clusters 访问您的 Confluent Control Center 实例。在 connect-default 下的 Connect 选项卡中验证已配置的连接器实例是否正在运行。
|
在底层,连接器将为每个主题创建一批更改,并使用预置的
其中 |