变更数据捕获策略

变更数据捕获策略利用 Neo4j 和 Aura Enterprise 5 提供的变更数据捕获功能,是源连接器实例的首选策略,因为它不需要任何模式更改,并且可以可靠地捕获删除操作。在将源实例配置为使用此策略之前,请务必按照变更数据捕获 > 入门中所述采取必要步骤。

为了配置此策略,您需要定义模式和选择器,这些模式和选择器描述了您希望跟踪哪些节点或关系的更改,并将它们分配给主题。

当数据库在本地部署中从备份恢复或从快照恢复,或在 Neo4j Aura 中暂停并恢复时,现有的变更标识符将不再起作用,您需要从头开始重新配置您的源实例。有关更多信息,请参阅变更数据捕获 > 恢复备份和快照

配置

首先,您需要为连接器实例选择 CDC 策略;

"neo4j.source-strategy": "CDC"

其次,您需要定义您的模式并将它们映射到您的主题;

"neo4j.cdc.topic.my-topic.patterns": "(:Person),(:Person)-[:KNOWS]-(:Person)"

虽然上述配置是为了方便起见而提供的,但如果您需要为变更事件定义其他过滤器,例如操作、更改的属性名称或元数据字段,您将需要使用如下所示的索引配置方法;

"neo4j.cdc.topic.my-topic.patterns.0.pattern": "(:Person)", (1)
"neo4j.cdc.topic.my-topic.patterns.0.operation": "create", (2)
"neo4j.cdc.topic.my-topic.patterns.0.changesTo": "name,surname", (3)
"neo4j.cdc.topic.my-topic.patterns.0.metadata.authenticatedUser": "neo4j", (4)
"neo4j.cdc.topic.my-topic.patterns.0.metadata.executingUser": "neo4j", (5)
"neo4j.cdc.topic.my-topic.patterns.0.metadata.txMetadata.app": "sales", (6)
"neo4j.cdc.topic.my-topic.patterns.1.pattern": "(:Person)-[:KNOWS]->(:Person)",
"neo4j.cdc.topic.my-topic.patterns.1.operation": "update",
"neo4j.cdc.topic.my-topic.patterns.1.changesTo": "since",
"neo4j.cdc.topic.my-topic.patterns.1.metadata.authenticatedUser": "neo4j",
"neo4j.cdc.topic.my-topic.patterns.1.metadata.executingUser": "neo4j",
"neo4j.cdc.topic.my-topic.patterns.1.metadata.txMetadata.app": "sales"
1 用于识别要监控更改的图实体的一个单一模式。
2 我们感兴趣的单一操作,可以是 create(创建)、update(更新)或 delete(删除)。
3 需要更新才能返回变更消息的属性列表。属性的添加和删除也算作更新。
4 执行更改的已验证用户。
5 执行更改的用户。通常与已验证用户相同,但如果使用模拟,则可能会有所不同。
6 需要与执行更改的事务的事务元数据匹配的键值对。
在上述示例中,只有 pattern 设置是强制性的,其他都是可选的,可以根据您的要求添加。

创建源实例

根据上述示例,您可以使用以下配置之一。选择其中一个消息序列化格式示例,并将其保存为名为 source.cdc.neo4j.json 的文件到本地目录中。

{
  "name": "Neo4jSourceConnectorAVRO",
  "config": {
    "connector.class": "org.neo4j.connectors.kafka.source.Neo4jConnector",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "neo4j.uri": "neo4j://neo4j:7687",
    "neo4j.authentication.type": "BASIC",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "password",
    "neo4j.source-strategy": "CDC",
    "neo4j.start-from": "NOW",
    "neo4j.cdc.poll-interval": "1s",
    "neo4j.cdc.poll-duration": "5s",
    "neo4j.cdc.topic.my-topic.patterns.0.pattern": "(:Person)",
    "neo4j.cdc.topic.my-topic.patterns.0.operation": "create",
    "neo4j.cdc.topic.my-topic.patterns.0.changesTo": "name,surname",
    "neo4j.cdc.topic.my-topic.patterns.0.metadata.authenticatedUser": "neo4j",
    "neo4j.cdc.topic.my-topic.patterns.0.metadata.executingUser": "neo4j",
    "neo4j.cdc.topic.my-topic.patterns.0.metadata.txMetadata.app": "sales",
    "neo4j.cdc.topic.my-topic.patterns.1.pattern": "(:Person)-[:KNOWS]->(:Person)",
    "neo4j.cdc.topic.my-topic.patterns.1.operation": "update",
    "neo4j.cdc.topic.my-topic.patterns.1.changesTo": "since",
    "neo4j.cdc.topic.my-topic.patterns.1.metadata.authenticatedUser": "neo4j",
    "neo4j.cdc.topic.my-topic.patterns.1.metadata.executingUser": "neo4j",
    "neo4j.cdc.topic.my-topic.patterns.1.metadata.txMetadata.app": "sales"
  }
}
{
  "name": "Neo4jSourceConnectorJSONSchema",
  "config": {
    "connector.class": "org.neo4j.connectors.kafka.source.Neo4jConnector",
    "key.converter": "io.confluent.connect.json.JsonSchemaConverter",
    "key.converter.schemas.enable": true,
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter": "io.confluent.connect.json.JsonSchemaConverter",
    "value.converter.schemas.enable": true,
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "neo4j.uri": "neo4j://neo4j:7687",
    "neo4j.authentication.type": "BASIC",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "password",
    "neo4j.source-strategy": "CDC",
    "neo4j.start-from": "NOW",
    "neo4j.cdc.poll-interval": "1s",
    "neo4j.cdc.poll-duration": "5s",
    "neo4j.cdc.topic.my-topic.patterns.0.pattern": "(:Person)",
    "neo4j.cdc.topic.my-topic.patterns.0.operation": "create",
    "neo4j.cdc.topic.my-topic.patterns.0.changesTo": "name,surname",
    "neo4j.cdc.topic.my-topic.patterns.0.metadata.authenticatedUser": "neo4j",
    "neo4j.cdc.topic.my-topic.patterns.0.metadata.executingUser": "neo4j",
    "neo4j.cdc.topic.my-topic.patterns.0.metadata.txMetadata.app": "sales",
    "neo4j.cdc.topic.my-topic.patterns.1.pattern": "(:Person)-[:KNOWS]->(:Person)",
    "neo4j.cdc.topic.my-topic.patterns.1.operation": "update",
    "neo4j.cdc.topic.my-topic.patterns.1.changesTo": "since",
    "neo4j.cdc.topic.my-topic.patterns.1.metadata.authenticatedUser": "neo4j",
    "neo4j.cdc.topic.my-topic.patterns.1.metadata.executingUser": "neo4j",
    "neo4j.cdc.topic.my-topic.patterns.1.metadata.txMetadata.app": "sales"
  }
}
{
  "name": "Neo4jSourceConnectorProtobuf",
  "config": {
    "connector.class": "org.neo4j.connectors.kafka.source.Neo4jConnector",
    "key.converter": "io.confluent.connect.protobuf.ProtobufConverter",
    "key.converter.schemas.enable": true,
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "key.converter.optional.for.nullables": true,
    "value.converter": "io.confluent.connect.protobuf.ProtobufConverter",
    "value.converter.schemas.enable": true,
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter.optional.for.nullables": true,
    "neo4j.uri": "neo4j://neo4j:7687",
    "neo4j.authentication.type": "BASIC",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "password",
    "neo4j.source-strategy": "CDC",
    "neo4j.start-from": "NOW",
    "neo4j.cdc.poll-interval": "1s",
    "neo4j.cdc.poll-duration": "5s",
    "neo4j.cdc.topic.my-topic.patterns.0.pattern": "(:Person)",
    "neo4j.cdc.topic.my-topic.patterns.0.operation": "create",
    "neo4j.cdc.topic.my-topic.patterns.0.changesTo": "name,surname",
    "neo4j.cdc.topic.my-topic.patterns.0.metadata.authenticatedUser": "neo4j",
    "neo4j.cdc.topic.my-topic.patterns.0.metadata.executingUser": "neo4j",
    "neo4j.cdc.topic.my-topic.patterns.0.metadata.txMetadata.app": "sales",
    "neo4j.cdc.topic.my-topic.patterns.1.pattern": "(:Person)-[:KNOWS]->(:Person)",
    "neo4j.cdc.topic.my-topic.patterns.1.operation": "update",
    "neo4j.cdc.topic.my-topic.patterns.1.changesTo": "since",
    "neo4j.cdc.topic.my-topic.patterns.1.metadata.authenticatedUser": "neo4j",
    "neo4j.cdc.topic.my-topic.patterns.1.metadata.executingUser": "neo4j",
    "neo4j.cdc.topic.my-topic.patterns.1.metadata.txMetadata.app": "sales"
  }
}

我们现在将通过调用以下 REST API 来创建源实例

curl -X POST https://:8083/connectors \
  -H "Content-Type:application/json" \
  -H "Accept:application/json" \
  -d @source.cdc.neo4j.json

这将创建一个 Kafka Connect 源实例,该实例将使用您首选的序列化格式,将与所提供选择器匹配的变更事件消息发送到 my-topic 主题。在 Control Center 中,确认源连接器已在 Connect 选项卡下的 connect-default 中创建。

模式

节点模式

节点模式的定义类似于 Cypher 节点模式。

  1. ( 开头。

  2. [可选] 定义可选的标签列表,用 : 分隔,例如 :Person:Person:Employee

  3. [可选] 以 { 打开属性部分。

    1. [可选] 定义用作键过滤器的属性及其值,格式为 key: value。可以定义多个属性,并且必须用 , 分隔。这些属性必须与 NODE KEY 约束属性对应。

    2. 以下任一;

      1. 无或 *,表示将 JSON 对象中的所有属性分配给节点。

      2. 要从 JSON 对象分配给节点的属性名称列表,用 , 分隔。

      3. 分配给节点的属性名称列表,每个名称前缀为 -,并用 , 分隔,JSON 对象中的所有其他属性都将分配给节点。

    3. } 关闭属性部分。

  4. ) 结尾。

您不能混用包含和排除属性,因此您的模式必须包含所有排除属性或所有包含属性。

示例

  • 选择任意节点上的所有更改。

    ()
  • 选择带有标签 :User 的节点上的所有更改。

    (:User)
  • 选择同时带有标签 :User:Employee 的节点上的所有更改。

    (:User:Employee)
  • 选择带有标签 :User 的节点上的所有更改,且仅在变更事件中包含 namesurname 属性。

    (:User{name, surname})
  • 选择带有标签 :User 的节点上的所有更改,并在变更事件中排除 adressdob 属性。

    (:User{-address, -dob})
  • 选择带有标签 :User 且键属性 userId 等于 1001 的节点上的所有更改,并在变更事件中包含 namesurname 属性。

    (:User{userId: 1001, name, surname})
    此示例要求 :User 标签的 userId 属性上存在 NODE KEY 约束。
  • 选择同时带有标签 :UserEmployee,且键属性 name 等于 johnsurname 等于 doe 的节点上的所有更改。

    (:User:Employee{name: 'john', surname: 'doe'})
    此示例要求 namesurname 属性上存在 NODE KEY 约束,无论是针对 :User 还是 :Employee 标签,或两者都包含。

关系模式

关系模式的定义类似于 Cypher 关系模式。

  1. 起始节点的节点模式,不包含任何属性包含或排除列表。

  2. -[.

  3. 定义关系类型,以 : 作为前缀,例如 :BOUGHT:KNOWS

  4. [可选] 以 { 打开属性部分。

    1. [可选] 定义用作键过滤器的属性及其值,格式为 key: value

    2. 以下任一;

      1. 无或 *,表示将 JSON 对象中的所有属性分配给节点。

      2. 要从 JSON 对象分配给节点的属性名称列表,用 , 分隔。

      3. 分配给节点的属性名称列表,每个名称前缀为 -,并用 , 分隔,JSON 对象中的所有其他属性都将分配给节点。

    3. } 关闭属性部分。

  5. ]->.

  6. 结束节点的节点模式,不包含任何属性包含或排除列表。

您不能混用包含和排除,因此您的模式必须包含所有排除属性或所有包含属性。

示例

  • 选择 :BOUGHT 关系上的所有更改。

    ()-[:BOUGHT]->()
  • 选择起始节点带有标签 :User 且结束节点带有标签 :Product:BOUGHT 关系上的所有更改。

    (:User)-[:BOUGHT]->(:Product)
  • 选择起始节点带有标签 :User:Employee 且结束节点带有标签 :Product:BOUGHT 关系上的所有更改。

    (:User:Employee)-[:BOUGHT]->(:Product)
  • 选择起始节点带有标签 :User 且结束节点带有标签 :Product:BOUGHT 关系上的所有更改,且仅在变更事件中包含 pricecurrency 属性。

    (:User)-[:BOUGHT{price, currency}]->(:Product)
  • 选择起始节点带有标签 :User 且结束节点带有标签 :Product:BOUGHT 关系上的所有更改,并在变更事件中排除 card 属性。

    (:User)-[:BOUGHT{-card}]->(:Product)
  • 选择由键属性 contractId 等于 5910 标识的 :WORKS_FOR 关系上的所有更改。

    ()-[:WORKS_FOR{contractId: 5910}]->()
    此示例要求 :WORKS_FOR 关系类型的 contractId 属性上存在 RELATIONSHIP KEY 约束。
  • 选择由键属性 contractId 等于 5910 标识的 :WORKS_FOR 关系上的所有更改,并在变更事件中排除 salary 属性。

    ()-[:WORKS_FOR{contractId: 5910,-salary}]->()
  • 选择从带有标签 :User 且由键属性 userId 等于 1001 标识的节点开始的关系上的所有更改。

    (:User{userId: 1001})-[]->()
© . All rights reserved.