Cypher 策略

此策略对接收到的每条消息执行相应的 Cypher 语句。

要为所需主题配置 Cypher 策略,您必须遵循以下约定

"neo4j.cypher.topic.<YOUR_TOPIC>": "<YOUR_CYPHER_QUERY>"

从 Neo4j Kafka 连接器 5.1.0 版本开始,Cypher 策略将消息的标题、键和值分别绑定为 __header__key__value,它们作为预定义变量传递给用户提供的 Cypher 查询。有关如何自定义变量名称的更多信息,请参阅 Cypher 策略设置

为了向后兼容,event 仍然可以使用,但它只对应于消息的值。

示例

假设您在接收连接器配置设置中配置了接收连接器订阅的主题,如下所示;

  "topics": "creates,updates,deletes"

您需要声明要使用 cypher 策略,并为每个主题提供相应的 Cypher 语句,类似于以下内容;

  "topics": "creates,updates,deletes",
  "neo4j.cypher.topic.creates": "WITH __value.event.state.after AS state MERGE (p:Person {name: state.properties.name, surname: state.properties.surname}) MERGE (f:Family {name: state.properties.surname}) MERGE (p)-[:BELONGS_TO]->(f)",
  "neo4j.cypher.topic.updates": "WITH __value.event.state.before AS before, __value.event.state.after AS after MATCH (p:Person {name: before.properties.name, surname: before.properties.surname}) MATCH (fPre:Family {name: before.properties.surname}) OPTIONAL MATCH (p)-[b:BELONGS_TO]->(fPre) DELETE b WITH after, p SET p.name = after.properties.name, p.surname = after.properties.surname MERGE (f:Family {name: after.properties.surname}) MERGE (p)-[:BELONGS_TO]->(f)",
  "neo4j.cypher.topic.deletes": "WITH __value.event.state.before AS before MATCH (p:Person {name: before.properties.name, surname: before.properties.surname}) DETACH DELETE p"

上面的配置摘录定义了;

  • creates 主题接收的消息将由接收连接器使用以下 Cypher 查询解包到 Neo4j 中

    WITH __value.event.state.after AS state
    MERGE (p:Person {name: state.properties.name, surname: state.properties.surname})
    MERGE (f:Family {name: state.properties.surname})
    MERGE (p)-[:BELONGS_TO]->(f)
  • updates 主题接收的消息将由接收连接器使用以下 Cypher 查询解包到 Neo4j 中

    WITH __value.event.state.before AS before, __value.event.state.after AS after
    MATCH (p:Person {name: before.properties.name, surname: before.properties.surname})
    MATCH (fPre:Family {name: before.properties.surname})
    OPTIONAL MATCH (p)-[b:BELONGS_TO]->(fPre)
    DELETE b
    WITH after, p
    SET p.name = after.properties.name, p.surname = after.properties.surname
    MERGE (f:Family {name: after.properties.surname})
    MERGE (p)-[:BELONGS_TO]->(f)
  • deletes 主题接收的消息将由接收连接器使用以下 Cypher 查询解包到 Neo4j 中

    WITH __value.event.state.before AS before
    MATCH (p:Person {name: before.properties.name, surname: before.properties.surname})
    DETACH DELETE p

创建接收实例

根据上面的示例,您可以使用以下配置之一。选择消息序列化格式示例之一,并将其另存为名为 sink.cypher.neo4j.json 的文件到本地目录中。

{
  "name": "Neo4jSinkConnectorAVRO",
  "config": {
    "topics": "creates,updates,deletes",
    "connector.class": "org.neo4j.connectors.kafka.sink.Neo4jConnector",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "neo4j.uri": "neo4j://neo4j:7687",
    "neo4j.authentication.type": "BASIC",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "password",
    "neo4j.cypher.topic.creates": "WITH __value.event.state.after AS state MERGE (p:Person {name: state.properties.name, surname: state.properties.surname}) MERGE (f:Family {name: state.properties.surname}) MERGE (p)-[:BELONGS_TO]->(f)",
    "neo4j.cypher.topic.updates": "WITH __value.event.state.before AS before, __value.event.state.after AS after MATCH (p:Person {name: before.properties.name, surname: before.properties.surname}) MATCH (fPre:Family {name: before.properties.surname}) OPTIONAL MATCH (p)-[b:BELONGS_TO]->(fPre) DELETE b WITH after, p SET p.name = after.properties.name, p.surname = after.properties.surname MERGE (f:Family {name: after.properties.surname}) MERGE (p)-[:BELONGS_TO]->(f)",
    "neo4j.cypher.topic.deletes": "WITH __value.event.state.before AS before MATCH (p:Person {name: before.properties.name, surname: before.properties.surname}) DETACH DELETE p",
    "neo4j.cypher.bind-header-as": "",
    "neo4j.cypher.bind-key-as": "",
    "neo4j.cypher.bind-value-as": "__value",
    "neo4j.cypher.bind-value-as-event": false
  }
}
{
  "name": "Neo4jSinkConnectorJSONSchema",
  "config": {
    "topics": "creates,updates,deletes",
    "connector.class": "org.neo4j.connectors.kafka.sink.Neo4jConnector",
    "key.converter": "io.confluent.connect.json.JsonSchemaConverter",
    "key.converter.schemas.enable": true,
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter": "io.confluent.connect.json.JsonSchemaConverter",
    "value.converter.schemas.enable": true,
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "neo4j.uri": "neo4j://neo4j:7687",
    "neo4j.authentication.type": "BASIC",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "password",
    "neo4j.cypher.topic.creates": "WITH __value.event.state.after AS state MERGE (p:Person {name: state.properties.name, surname: state.properties.surname}) MERGE (f:Family {name: state.properties.surname}) MERGE (p)-[:BELONGS_TO]->(f)",
    "neo4j.cypher.topic.updates": "WITH __value.event.state.before AS before, __value.event.state.after AS after MATCH (p:Person {name: before.properties.name, surname: before.properties.surname}) MATCH (fPre:Family {name: before.properties.surname}) OPTIONAL MATCH (p)-[b:BELONGS_TO]->(fPre) DELETE b WITH after, p SET p.name = after.properties.name, p.surname = after.properties.surname MERGE (f:Family {name: after.properties.surname}) MERGE (p)-[:BELONGS_TO]->(f)",
    "neo4j.cypher.topic.deletes": "WITH __value.event.state.before AS before MATCH (p:Person {name: before.properties.name, surname: before.properties.surname}) DETACH DELETE p",
    "neo4j.cypher.bind-header-as": "",
    "neo4j.cypher.bind-key-as": "",
    "neo4j.cypher.bind-value-as": "__value",
    "neo4j.cypher.bind-value-as-event": false
  }
}
{
  "name": "Neo4jSinkConnectorProtobuf",
  "config": {
    "topics": "creates,updates,deletes",
    "connector.class": "org.neo4j.connectors.kafka.sink.Neo4jConnector",
    "key.converter": "io.confluent.connect.protobuf.ProtobufConverter",
    "key.converter.schemas.enable": true,
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "key.converter.optional.for.nullables": true,
    "value.converter": "io.confluent.connect.protobuf.ProtobufConverter",
    "value.converter.schemas.enable": true,
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter.optional.for.nullables": true,
    "neo4j.uri": "neo4j://neo4j:7687",
    "neo4j.authentication.type": "BASIC",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "password",
    "neo4j.cypher.topic.creates": "WITH __value.event.state.after AS state MERGE (p:Person {name: state.properties.name, surname: state.properties.surname}) MERGE (f:Family {name: state.properties.surname}) MERGE (p)-[:BELONGS_TO]->(f)",
    "neo4j.cypher.topic.updates": "WITH __value.event.state.before AS before, __value.event.state.after AS after MATCH (p:Person {name: before.properties.name, surname: before.properties.surname}) MATCH (fPre:Family {name: before.properties.surname}) OPTIONAL MATCH (p)-[b:BELONGS_TO]->(fPre) DELETE b WITH after, p SET p.name = after.properties.name, p.surname = after.properties.surname MERGE (f:Family {name: after.properties.surname}) MERGE (p)-[:BELONGS_TO]->(f)",
    "neo4j.cypher.topic.deletes": "WITH __value.event.state.before AS before MATCH (p:Person {name: before.properties.name, surname: before.properties.surname}) DETACH DELETE p",
    "neo4j.cypher.bind-header-as": "",
    "neo4j.cypher.bind-key-as": "",
    "neo4j.cypher.bind-value-as": "__value",
    "neo4j.cypher.bind-value-as-event": false
  }
}

使用此 REST 调用将配置加载到 Kafka Connect 中

curl -X POST http://localhost:8083/connectors \
  -H 'Content-Type:application/json' \
  -H 'Accept:application/json' \
  -d @sink.cypher.neo4j.json

现在,您可以在 http://localhost:9021/clusters 下的 Confluent Control Center 实例中访问它。验证配置的连接器实例是否在 connect-default 下的 连接 选项卡中运行。

在幕后,连接器将为每个主题创建一个更改批次,并将使用预先附加的 UNWIND 子句执行查询。对于上面的示例,从 creates 主题接收的消息执行的查询将类似于;

UNWIND $events AS message
WITH message.value AS event, message.header AS __header, message.key AS __key, message.value AS __value
WITH __value.event.state.after AS state
MERGE (p:Person {name: state.properties.name, surname: state.properties.surname})
MERGE (f:Family {name: state.properties.surname})
MERGE (p)-[:BELONGS_TO]->(f)

其中 $events 是更改事件的批次。