数据变更捕获策略

此策略允许摄取来自另一个 Neo4j 实例的 CDC 事件,这些事件由配置为 数据变更捕获策略 的源连接器实例或已弃用的 Neo4j 流 插件生成。

数据变更捕获事件需要由源连接器的相同对应版本生成,该连接器必须使用支持模式的值转换器进行配置。

两种子策略可用

  • 模式 策略根据源数据库中定义的约束(节点键、关系键和/或属性唯一性+存在性)合并节点和关系。

  • 源 ID 策略根据 CDC 事件的 elementIdid 字段(Neo4j 内部实体标识符)合并节点和关系。

模式子策略

模式 策略使用变更事件中声明的约束合并节点和关系,从而保留源模式结构。

此策略的配置需要声明要从中读取变更事件的主题列表。

"neo4j.cdc.schema.topics": "<COMMA_SEPARATED_LIST_OF_TOPICS>"

示例

假设您将接收器连接器订阅的主题配置如下;

"topics": "topic.1,topic.2"

您需要声明要使用 cdc.schema 策略,方法是提供要从中使用变更事件的主题列表。

"neo4j.cdc.schema.topics": "topic.1,topic.2"

然后,每个变更事件都将投影到一个图实体中。

考虑此节点创建事件

{
  "id": "A3Qc5ZIZ_Eo5v5xsONVo8KUAAAAAAAAADAAAAAAAAAAA",
  "seq": 0,
  "txId": 12,
  "metadata": {
    "authenticatedUser": "neo4j",
    "captureMode": "FULL",
    "connectionClient": "192.168.65.1:46246",
    "connectionServer": "172.17.0.2:7687",
    "connectionType": "bolt",
    "databaseName": "neo4j",
    "executingUser": "neo4j",
    "serverId": "7528cb82",
    "txCommitTime": "2024-03-03T20:51:56.769Z",
    "txMetadata": {
      "app": "cypher-shell_v5.6.0",
      "type": "user-direct"
    },
    "txStartTime": "2024-03-03T20:51:56.714Z"
  },
  "event": {
    "elementId": "4:741ce592-19fc-4a39-bf9c-6c38d568f0a5:0",
    "eventType": "n",
    "operation": "c",
    "keys": {
      "Person": [
        {
          "first_name": "John",
          "last_name": "Doe"
        }
      ]
    },
    "labels": [
      "Person"
    ],
    "state": {
      "before": null,
      "after": {
        "labels": [
          "Person"
        ],
        "properties": {
          "email": "john.doe@example.com",
          "first_name": "John",
          "last_name": "Doe"
        }
      }
    }
  }
}
{
  "meta": {
    "timestamp": 1532597182604,
    "username": "neo4j",
    "tx_id": 3,
    "tx_event_id": 0,
    "tx_events_count": 2,
    "operation": "created",
    "source": {
      "hostname": "neo4j.example.com"
    }
  },
  "payload": {
    "id": "1004",
    "type": "node",
    "after": {
      "labels": [
        "Person"
      ],
      "properties": {
        "email": "john.doe@example.com",
        "last_name": "Doe",
        "first_name": "John"
      }
    }
  },
  "schema": {
    "properties": {
      "last_name": "String",
      "email": "String",
      "first_name": "String"
    },
    "constraints": [
      {
        "label": "Person",
        "properties": [
          "first_name",
          "last_name"
        ],
        "type": "UNIQUE"
      }
    ]
  }
}

关系以如下方式持久化,接收器连接器使用 keysschema 字段来插入/更新节点,而无需额外的属性或标签。

(:Person {first_name: "John", last_name: "Doe", email: "john.doe@example.com"}
(:Person {first_name: "John", last_name: "Doe", email: "john.doe@example.com"})

考虑此关系创建事件

{
  "id": "A3Qc5ZIZ_Eo5v5xsONVo8KUAAAAAAAAAFAAAAAAAAAAA",
  "txId": 20,
  "seq": 0,
  "metadata": {
    "authenticatedUser": "neo4j",
    "captureMode": "FULL",
    "connectionClient": "192.168.65.1:46246",
    "connectionServer": "172.17.0.2:7687",
    "connectionType": "bolt",
    "databaseName": "neo4j",
    "executingUser": "neo4j",
    "serverId": "7528cb82",
    "txCommitTime": "2024-03-03T21:34:02.965Z",
    "txMetadata": {
      "app": "cypher-shell_v5.6.0",
      "type": "user-direct"
    },
    "txStartTime": "2024-03-03T21:34:02.867Z"
  },
  "event": {
    "elementId": "5:741ce592-19fc-4a39-bf9c-6c38d568f0a5:0",
    "type": "KNOWS",
    "eventType": "r",
    "operation": "c",
    "start": {
      "elementId": "4:741ce592-19fc-4a39-bf9c-6c38d568f0a5:0",
      "keys": {
        "Person": [
          {
            "first_name": "John",
            "last_name": "Doe"
          }
        ]
      },
      "labels": [
        "Person"
      ]
    },
    "end": {
      "elementId": "4:741ce592-19fc-4a39-bf9c-6c38d568f0a5:1",
      "keys": {
        "Person": [
          {
            "first_name": "Mary",
            "last_name": "Doe"
          }
        ]
      },
      "labels": [
        "Person"
      ]
    },
    "keys": [],
    "state": {
      "before": null,
      "after": {
        "properties": {
          "since": "2012-01-01"
        }
      }
    }
  }
}
{
  "meta": {
    "timestamp": 1532597182604,
    "username": "neo4j",
    "tx_id": 3,
    "tx_event_id": 0,
    "tx_events_count": 2,
    "operation": "created",
    "source": {
      "hostname": "neo4j.example.com"
    }
  },
  "payload": {
    "id": "123",
    "type": "relationship",
    "label": "KNOWS",
    "start": {
      "labels": [
        "Person"
      ],
      "id": "123",
      "ids": {
        "last_name": "Doe",
        "first_name": "John"
      }
    },
    "end": {
      "labels": [
        "Person"
      ],
      "id": "456",
      "ids": {
        "last_name": "Doe",
        "first_name": "Mary"
      }
    },
    "after": {
      "properties": {
        "since": "2012-01-01"
      }
    }
  },
  "schema": {
    "properties": {
      "since": "LocalDateTime"
    },
    "constraints": [
      {
        "label": "KNOWS",
        "properties": [
          "since"
        ],
        "type": "RELATIONSHIP_PROPERTY_EXISTS"
      }
    ]
  }
}

关系以如下方式持久化,接收器连接器使用变更事件中起始节点和结束节点的 keys 字段来创建或更新关系,同样无需额外的属性或标签。

(:Person {last_name: "Doe", first_name: "John"})-[:KNOWS {since: "2012-01-01"}]->(:Person {last_name: "Doe", first_name: "Mary"})
(:Person {last_name: "Doe", first_name: "John"})-[:KNOWS {since: "2012-01-01"}]->(:Person {last_name: "Doe", first_name: "Mary"})

创建接收器实例

根据以上示例,您可以使用以下配置之一。选择消息序列化格式示例之一,并将其另存为名为 sink.cdc.schema.neo4j.json 的文件到本地目录中。

{
  "name": "Neo4jSinkConnectorAVRO",
  "config": {
    "topics": "topic.1,topic.2",
    "connector.class": "org.neo4j.connectors.kafka.sink.Neo4jConnector",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "neo4j.uri": "neo4j://neo4j:7687",
    "neo4j.authentication.type": "BASIC",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "password",
    "neo4j.cdc.schema.topics": "topic.1,topic.2"
  }
}
{
  "name": "Neo4jSinkConnectorJSONSchema",
  "config": {
    "topics": "topic.1,topic.2",
    "connector.class": "org.neo4j.connectors.kafka.sink.Neo4jConnector",
    "key.converter": "io.confluent.connect.json.JsonSchemaConverter",
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter": "io.confluent.connect.json.JsonSchemaConverter",
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "neo4j.uri": "neo4j://neo4j:7687",
    "neo4j.authentication.type": "BASIC",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "password",
    "neo4j.cdc.schema.topics": "topic.1,topic.2"
  }
}
{
  "name": "Neo4jSinkConnectorProtobuf",
  "config": {
    "topics": "topic.1,topic.2",
    "connector.class": "org.neo4j.connectors.kafka.sink.Neo4jConnector",
    "key.converter": "io.confluent.connect.protobuf.ProtobufConverter",
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "key.converter.optional.for.nullables": true,
    "value.converter": "io.confluent.connect.protobuf.ProtobufConverter",
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter.optional.for.nullables": true,
    "neo4j.uri": "neo4j://neo4j:7687",
    "neo4j.authentication.type": "BASIC",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "password",
    "neo4j.cdc.schema.topics": "topic.1,topic.2"
  }
}

使用此 REST 调用将配置加载到 Kafka Connect 中

curl -X POST http://localhost:8083/connectors \
  -H 'Content-Type:application/json' \
  -H 'Accept:application/json' \
  -d @sink.cdc.schema.neo4j.json

现在,您可以在 http://localhost:9021/clusters 下访问您的 Confluent Control Center 实例。验证配置的连接器实例是否在 connect-default 下的 连接 选项卡中运行。

源 ID 子策略

源 ID 策略通过将此值存储为目标节点和关系上的显式属性,并通过使用显式标签标记节点,根据源实体的 elementIdid 值合并节点和关系。

此策略的配置需要声明要从中读取变更事件的主题列表。您可以添加一个可选的标签名称用作标记,以及一个可选的属性名称来存储源实体的 elementIdid 值。

"neo4j.cdc.source-id.topics": "<comma-separated list of topics>"
"neo4j.cdc.source-id.label-name": "<the label attached to the node, default=SourceEvent>"
"neo4j.cdc.source-id.property-name": "<the property name given to the CDC id field, default=sourceId>"

示例

假设您将接收器连接器订阅的主题配置如下;

"topics": "topic.1,topic.2"

您需要声明要使用 cdc.source-id 策略,方法是提供要从中使用变更事件的主题列表。

"neo4j.cdc.source-id.topics": "topic.1,topic.2"

然后,每个变更事件都将投影到一个图实体中。

考虑此节点创建事件

{
  "id": "A3Qc5ZIZ_Eo5v5xsONVo8KUAAAAAAAAADAAAAAAAAAAA",
  "seq": 0,
  "txId": 12,
  "metadata": {
    "authenticatedUser": "neo4j",
    "captureMode": "FULL",
    "connectionClient": "192.168.65.1:46246",
    "connectionServer": "172.17.0.2:7687",
    "connectionType": "bolt",
    "databaseName": "neo4j",
    "executingUser": "neo4j",
    "serverId": "7528cb82",
    "txCommitTime": "2024-03-03T20:51:56.769Z",
    "txMetadata": {
      "app": "cypher-shell_v5.6.0",
      "type": "user-direct"
    },
    "txStartTime": "2024-03-03T20:51:56.714Z"
  },
  "event": {
    "elementId": "4:741ce592-19fc-4a39-bf9c-6c38d568f0a5:0",
    "eventType": "n",
    "operation": "c",
    "keys": {
      "Person": [
        {
          "first_name": "John",
          "last_name": "Doe"
        }
      ]
    },
    "labels": [
      "Person"
    ],
    "state": {
      "before": null,
      "after": {
        "labels": [
          "Person"
        ],
        "properties": {
          "email": "john.doe@example.com",
          "first_name": "John",
          "last_name": "Doe"
        }
      }
    }
  }
}
{
  "meta": {
    "timestamp": 1532597182604,
    "username": "neo4j",
    "tx_id": 3,
    "tx_event_id": 0,
    "tx_events_count": 2,
    "operation": "created",
    "source": {
      "hostname": "neo4j.example.com"
    }
  },
  "payload": {
    "id": "1004",
    "type": "node",
    "after": {
      "labels": [
        "Person"
      ],
      "properties": {
        "email": "john.doe@example.com",
        "last_name": "Doe",
        "first_name": "John"
      }
    }
  },
  "schema": {
    "properties": {
      "last_name": "String",
      "email": "String",
      "first_name": "String"
    },
    "constraints": [
      {
        "label": "Person",
        "properties": [
          "first_name",
          "last_name"
        ],
        "type": "UNIQUE"
      }
    ]
  }
}

节点以如下方式持久化,接收器连接器使用节点变更事件的 elementIdid 字段来创建或更新节点。

(:Person:SourceEvent {first_name: "John", last_name: "Doe", email: "john.doe@example.com", sourceId: "4:741ce592-19fc-4a39-bf9c-6c38d568f0a5:0"}
(:Person:SourceEvent {first_name: "John", last_name: "Doe", email: "john.doe@example.com", sourceId: "1004"})

考虑此关系创建事件

{
  "id": "A3Qc5ZIZ_Eo5v5xsONVo8KUAAAAAAAAAFAAAAAAAAAAA",
  "txId": 20,
  "seq": 0,
  "metadata": {
    "authenticatedUser": "neo4j",
    "captureMode": "FULL",
    "connectionClient": "192.168.65.1:46246",
    "connectionServer": "172.17.0.2:7687",
    "connectionType": "bolt",
    "databaseName": "neo4j",
    "executingUser": "neo4j",
    "serverId": "7528cb82",
    "txCommitTime": "2024-03-03T21:34:02.965Z",
    "txMetadata": {
      "app": "cypher-shell_v5.6.0",
      "type": "user-direct"
    },
    "txStartTime": "2024-03-03T21:34:02.867Z"
  },
  "event": {
    "elementId": "5:741ce592-19fc-4a39-bf9c-6c38d568f0a5:0",
    "type": "KNOWS",
    "eventType": "r",
    "operation": "c",
    "start": {
      "elementId": "4:741ce592-19fc-4a39-bf9c-6c38d568f0a5:0",
      "keys": {
        "Person": [
          {
            "first_name": "John",
            "last_name": "Doe"
          }
        ]
      },
      "labels": [
        "Person"
      ]
    },
    "end": {
      "elementId": "4:741ce592-19fc-4a39-bf9c-6c38d568f0a5:1",
      "keys": {
        "Person": [
          {
            "first_name": "Mary",
            "last_name": "Doe"
          }
        ]
      },
      "labels": [
        "Person"
      ]
    },
    "keys": [],
    "state": {
      "before": null,
      "after": {
        "properties": {
          "since": "2012-01-01"
        }
      }
    }
  }
}
{
  "meta": {
    "timestamp": 1532597182604,
    "username": "neo4j",
    "tx_id": 3,
    "tx_event_id": 0,
    "tx_events_count": 2,
    "operation": "created",
    "source": {
      "hostname": "neo4j.example.com"
    }
  },
  "payload": {
    "id": "123",
    "type": "relationship",
    "label": "KNOWS",
    "start": {
      "labels": [
        "Person"
      ],
      "id": "123",
      "ids": {
        "last_name": "Doe",
        "first_name": "John"
      }
    },
    "end": {
      "labels": [
        "Person"
      ],
      "id": "456",
      "ids": {
        "last_name": "Doe",
        "first_name": "Mary"
      }
    },
    "after": {
      "properties": {
        "since": "2012-01-01"
      }
    }
  },
  "schema": {
    "properties": {
      "since": "LocalDateTime"
    },
    "constraints": [
      {
        "label": "KNOWS",
        "properties": [
          "since"
        ],
        "type": "RELATIONSHIP_PROPERTY_EXISTS"
      }
    ]
  }
}

关系以如下方式持久化,接收器连接器使用变更事件中起始节点和结束节点的 elementIdid 字段来创建或更新关系。

(:Person:SourceEvent {last_name: "Doe", first_name: "John", sourceId: "4:741ce592-19fc-4a39-bf9c-6c38d568f0a5:0"})-[:KNOWS {since: "2012-01-01", sourceId: "5:741ce592-19fc-4a39-bf9c-6c38d568f0a5:0"}]->(:Person:SourceEvent {last_name: "Doe", first_name: "Mary", sourceId: "4:741ce592-19fc-4a39-bf9c-6c38d568f0a5:1"})
(:Person:SourceEvent {last_name: "Doe", first_name: "John", sourceId: "123"})-[:KNOWS {since: "2012-01-01", sourceId: "123"}]->(:Person:SourceEvent {last_name: "Doe", first_name: "Mary", sourceId: "456"})

创建接收器实例

根据以上示例,您可以使用以下配置之一。选择消息序列化格式示例之一,并将其另存为名为 sink.cdc.source-id.neo4j.json 的文件到本地目录中。

{
  "name": "Neo4jSinkConnectorAVRO",
  "config": {
    "topics": "topic.1,topic.2",
    "connector.class": "org.neo4j.connectors.kafka.sink.Neo4jConnector",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "neo4j.uri": "neo4j://neo4j:7687",
    "neo4j.authentication.type": "BASIC",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "password",
    "neo4j.cdc.source-id.topics": "topic.1,topic.2",
    "neo4j.cdc.source-id.label-name": "SourceEvent",
    "neo4j.cdc.source-id.property-name": "sourceId"
  }
}
{
  "name": "Neo4jSinkConnectorJSONSchema",
  "config": {
    "topics": "topic.1,topic.2",
    "connector.class": "org.neo4j.connectors.kafka.sink.Neo4jConnector",
    "key.converter": "io.confluent.connect.json.JsonSchemaConverter",
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter": "io.confluent.connect.json.JsonSchemaConverter",
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "neo4j.uri": "neo4j://neo4j:7687",
    "neo4j.authentication.type": "BASIC",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "password",
    "neo4j.cdc.source-id.topics": "topic.1,topic.2",
    "neo4j.cdc.source-id.label-name": "SourceEvent",
    "neo4j.cdc.source-id.property-name": "sourceId"
  }
}
{
  "name": "Neo4jSinkConnectorProtobuf",
  "config": {
    "topics": "topic.1,topic.2",
    "connector.class": "org.neo4j.connectors.kafka.sink.Neo4jConnector",
    "key.converter": "io.confluent.connect.protobuf.ProtobufConverter",
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "key.converter.optional.for.nullables": true,
    "value.converter": "io.confluent.connect.protobuf.ProtobufConverter",
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter.optional.for.nullables": true,
    "neo4j.uri": "neo4j://neo4j:7687",
    "neo4j.authentication.type": "BASIC",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "password",
    "neo4j.cdc.source-id.topics": "topic.1,topic.2",
    "neo4j.cdc.source-id.label-name": "SourceEvent",
    "neo4j.cdc.source-id.property-name": "sourceId"
  }
}

使用此 REST 调用将配置加载到 Kafka Connect 中

curl -X POST http://localhost:8083/connectors \
  -H 'Content-Type:application/json' \
  -H 'Accept:application/json' \
  -d @sink.cdc.source-id.neo4j.json

现在,您可以在 http://localhost:9021/clusters 下访问您的 Confluent Control Center 实例。验证配置的连接器实例是否在 connect-default 下的 连接 选项卡中运行。