变更数据捕获策略
此策略允许摄取来自另一个 Neo4j 实例的 CDC 事件,这些事件可由配置为变更数据捕获策略的源连接器实例生成,也可由已弃用的Neo4j Streams插件生成。
变更数据捕获事件需要由源连接器的相同对应版本生成,并且必须使用支持模式的值转换器进行配置。 |
有两种子策略可用
模式子策略
模式
策略使用变更事件中声明的约束合并节点和关系,从而保留源模式结构。
此策略的配置需要声明要读取变更事件的主题列表。
"neo4j.cdc.schema.topics": "<COMMA_SEPARATED_LIST_OF_TOPICS>"
示例
假设您配置您的 Sink 连接器订阅的主题如下;
"topics": "topic.1,topic.2"
您需要通过提供要消费变更事件的主题列表来声明您希望使用 cdc.schema
策略。
"neo4j.cdc.schema.topics": "topic.1,topic.2"
每个变更事件都将被投影到图实体中。
考虑此节点创建事件
{
"id": "A3Qc5ZIZ_Eo5v5xsONVo8KUAAAAAAAAADAAAAAAAAAAA",
"seq": 0,
"txId": 12,
"metadata": {
"authenticatedUser": "neo4j",
"captureMode": "FULL",
"connectionClient": "192.168.65.1:46246",
"connectionServer": "172.17.0.2:7687",
"connectionType": "bolt",
"databaseName": "neo4j",
"executingUser": "neo4j",
"serverId": "7528cb82",
"txCommitTime": "2024-03-03T20:51:56.769Z",
"txMetadata": {
"app": "cypher-shell_v5.6.0",
"type": "user-direct"
},
"txStartTime": "2024-03-03T20:51:56.714Z"
},
"event": {
"elementId": "4:741ce592-19fc-4a39-bf9c-6c38d568f0a5:0",
"eventType": "n",
"operation": "c",
"keys": {
"Person": [
{
"first_name": "John",
"last_name": "Doe"
}
]
},
"labels": [
"Person"
],
"state": {
"before": null,
"after": {
"labels": [
"Person"
],
"properties": {
"email": "john.doe@example.com",
"first_name": "John",
"last_name": "Doe"
}
}
}
}
}
{
"meta": {
"timestamp": 1532597182604,
"username": "neo4j",
"tx_id": 3,
"tx_event_id": 0,
"tx_events_count": 2,
"operation": "created",
"source": {
"hostname": "neo4j.example.com"
}
},
"payload": {
"id": "1004",
"type": "node",
"after": {
"labels": [
"Person"
],
"properties": {
"email": "john.doe@example.com",
"last_name": "Doe",
"first_name": "John"
}
}
},
"schema": {
"properties": {
"last_name": "String",
"email": "String",
"first_name": "String"
},
"constraints": [
{
"label": "Person",
"properties": [
"first_name",
"last_name"
],
"type": "UNIQUE"
}
]
}
}
关系如下所示持久化,Sink 连接器使用 keys
或 schema
字段来插入/更新节点,无需额外的属性或标签。
(:Person {first_name: "John", last_name: "Doe", email: "john.doe@example.com"}
(:Person {first_name: "John", last_name: "Doe", email: "john.doe@example.com"})
考虑此关系创建事件
{
"id": "A3Qc5ZIZ_Eo5v5xsONVo8KUAAAAAAAAAFAAAAAAAAAAA",
"txId": 20,
"seq": 0,
"metadata": {
"authenticatedUser": "neo4j",
"captureMode": "FULL",
"connectionClient": "192.168.65.1:46246",
"connectionServer": "172.17.0.2:7687",
"connectionType": "bolt",
"databaseName": "neo4j",
"executingUser": "neo4j",
"serverId": "7528cb82",
"txCommitTime": "2024-03-03T21:34:02.965Z",
"txMetadata": {
"app": "cypher-shell_v5.6.0",
"type": "user-direct"
},
"txStartTime": "2024-03-03T21:34:02.867Z"
},
"event": {
"elementId": "5:741ce592-19fc-4a39-bf9c-6c38d568f0a5:0",
"type": "KNOWS",
"eventType": "r",
"operation": "c",
"start": {
"elementId": "4:741ce592-19fc-4a39-bf9c-6c38d568f0a5:0",
"keys": {
"Person": [
{
"first_name": "John",
"last_name": "Doe"
}
]
},
"labels": [
"Person"
]
},
"end": {
"elementId": "4:741ce592-19fc-4a39-bf9c-6c38d568f0a5:1",
"keys": {
"Person": [
{
"first_name": "Mary",
"last_name": "Doe"
}
]
},
"labels": [
"Person"
]
},
"keys": [],
"state": {
"before": null,
"after": {
"properties": {
"since": "2012-01-01"
}
}
}
}
}
{
"meta": {
"timestamp": 1532597182604,
"username": "neo4j",
"tx_id": 3,
"tx_event_id": 0,
"tx_events_count": 2,
"operation": "created",
"source": {
"hostname": "neo4j.example.com"
}
},
"payload": {
"id": "123",
"type": "relationship",
"label": "KNOWS",
"start": {
"labels": [
"Person"
],
"id": "123",
"ids": {
"last_name": "Doe",
"first_name": "John"
}
},
"end": {
"labels": [
"Person"
],
"id": "456",
"ids": {
"last_name": "Doe",
"first_name": "Mary"
}
},
"after": {
"properties": {
"since": "2012-01-01"
}
}
},
"schema": {
"properties": {
"since": "LocalDateTime"
},
"constraints": [
{
"label": "KNOWS",
"properties": [
"since"
],
"type": "RELATIONSHIP_PROPERTY_EXISTS"
}
]
}
}
关系如下所示持久化,Sink 连接器使用变更事件中起始和结束节点的 keys
字段来创建或更新关系,同样无需额外的属性或标签。
(:Person {last_name: "Doe", first_name: "John"})-[:KNOWS {since: "2012-01-01"}]->(:Person {last_name: "Doe", first_name: "Mary"})
(:Person {last_name: "Doe", first_name: "John"})-[:KNOWS {since: "2012-01-01"}]->(:Person {last_name: "Doe", first_name: "Mary"})
创建 Sink 实例
根据以上示例,您可以使用以下配置之一。选择一个消息序列化格式示例,并将其保存为名为 sink.cdc.schema.neo4j.json
的文件到本地目录中。
{
"name": "Neo4jSinkConnectorAVRO",
"config": {
"topics": "topic.1,topic.2",
"connector.class": "org.neo4j.connectors.kafka.sink.Neo4jConnector",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"neo4j.uri": "neo4j://neo4j:7687",
"neo4j.authentication.type": "BASIC",
"neo4j.authentication.basic.username": "neo4j",
"neo4j.authentication.basic.password": "password",
"neo4j.cdc.schema.topics": "topic.1,topic.2"
}
}
{
"name": "Neo4jSinkConnectorJSONSchema",
"config": {
"topics": "topic.1,topic.2",
"connector.class": "org.neo4j.connectors.kafka.sink.Neo4jConnector",
"key.converter": "io.confluent.connect.json.JsonSchemaConverter",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter": "io.confluent.connect.json.JsonSchemaConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"neo4j.uri": "neo4j://neo4j:7687",
"neo4j.authentication.type": "BASIC",
"neo4j.authentication.basic.username": "neo4j",
"neo4j.authentication.basic.password": "password",
"neo4j.cdc.schema.topics": "topic.1,topic.2"
}
}
{
"name": "Neo4jSinkConnectorProtobuf",
"config": {
"topics": "topic.1,topic.2",
"connector.class": "org.neo4j.connectors.kafka.sink.Neo4jConnector",
"key.converter": "io.confluent.connect.protobuf.ProtobufConverter",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"key.converter.optional.for.nullables": true,
"value.converter": "io.confluent.connect.protobuf.ProtobufConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter.optional.for.nullables": true,
"neo4j.uri": "neo4j://neo4j:7687",
"neo4j.authentication.type": "BASIC",
"neo4j.authentication.basic.username": "neo4j",
"neo4j.authentication.basic.password": "password",
"neo4j.cdc.schema.topics": "topic.1,topic.2"
}
}
使用此 REST 调用将配置加载到 Kafka Connect 中
curl -X POST https://:8083/connectors \
-H 'Content-Type:application/json' \
-H 'Accept:application/json' \
-d @sink.cdc.schema.neo4j.json
现在您可以访问位于 https://:9021/clusters
的 Confluent Control Center 实例。验证已配置的连接器实例正在 Connect
选项卡的 connect-default
下运行。
源 ID 子策略
源 ID
策略通过源实体的 elementId
或 id
值合并节点和关系,方法是将此值作为目标节点和关系的显式属性存储,并通过显式标签标记节点。
此策略的配置需要声明要读取变更事件的主题列表。您可以添加一个可选的标签名称作为标记,以及一个可选的属性名称来存储源实体的 elementId
或 id
值。
"neo4j.cdc.source-id.topics": "<comma-separated list of topics>"
"neo4j.cdc.source-id.label-name": "<the label attached to the node, default=SourceEvent>"
"neo4j.cdc.source-id.property-name": "<the property name given to the CDC id field, default=sourceId>"
示例
假设您配置您的 Sink 连接器订阅的主题如下;
"topics": "topic.1,topic.2"
您需要通过提供要消费变更事件的主题列表来声明您希望使用 cdc.source-id
策略。
"neo4j.cdc.source-id.topics": "topic.1,topic.2"
每个变更事件都将被投影到图实体中。
考虑此节点创建事件
{
"id": "A3Qc5ZIZ_Eo5v5xsONVo8KUAAAAAAAAADAAAAAAAAAAA",
"seq": 0,
"txId": 12,
"metadata": {
"authenticatedUser": "neo4j",
"captureMode": "FULL",
"connectionClient": "192.168.65.1:46246",
"connectionServer": "172.17.0.2:7687",
"connectionType": "bolt",
"databaseName": "neo4j",
"executingUser": "neo4j",
"serverId": "7528cb82",
"txCommitTime": "2024-03-03T20:51:56.769Z",
"txMetadata": {
"app": "cypher-shell_v5.6.0",
"type": "user-direct"
},
"txStartTime": "2024-03-03T20:51:56.714Z"
},
"event": {
"elementId": "4:741ce592-19fc-4a39-bf9c-6c38d568f0a5:0",
"eventType": "n",
"operation": "c",
"keys": {
"Person": [
{
"first_name": "John",
"last_name": "Doe"
}
]
},
"labels": [
"Person"
],
"state": {
"before": null,
"after": {
"labels": [
"Person"
],
"properties": {
"email": "john.doe@example.com",
"first_name": "John",
"last_name": "Doe"
}
}
}
}
}
{
"meta": {
"timestamp": 1532597182604,
"username": "neo4j",
"tx_id": 3,
"tx_event_id": 0,
"tx_events_count": 2,
"operation": "created",
"source": {
"hostname": "neo4j.example.com"
}
},
"payload": {
"id": "1004",
"type": "node",
"after": {
"labels": [
"Person"
],
"properties": {
"email": "john.doe@example.com",
"last_name": "Doe",
"first_name": "John"
}
}
},
"schema": {
"properties": {
"last_name": "String",
"email": "String",
"first_name": "String"
},
"constraints": [
{
"label": "Person",
"properties": [
"first_name",
"last_name"
],
"type": "UNIQUE"
}
]
}
}
节点如下所示持久化,Sink 连接器使用节点变更事件的 elementId
或 id
字段来创建或更新节点。
(:Person:SourceEvent {first_name: "John", last_name: "Doe", email: "john.doe@example.com", sourceId: "4:741ce592-19fc-4a39-bf9c-6c38d568f0a5:0"}
(:Person:SourceEvent {first_name: "John", last_name: "Doe", email: "john.doe@example.com", sourceId: "1004"})
考虑此关系创建事件
{
"id": "A3Qc5ZIZ_Eo5v5xsONVo8KUAAAAAAAAAFAAAAAAAAAAA",
"txId": 20,
"seq": 0,
"metadata": {
"authenticatedUser": "neo4j",
"captureMode": "FULL",
"connectionClient": "192.168.65.1:46246",
"connectionServer": "172.17.0.2:7687",
"connectionType": "bolt",
"databaseName": "neo4j",
"executingUser": "neo4j",
"serverId": "7528cb82",
"txCommitTime": "2024-03-03T21:34:02.965Z",
"txMetadata": {
"app": "cypher-shell_v5.6.0",
"type": "user-direct"
},
"txStartTime": "2024-03-03T21:34:02.867Z"
},
"event": {
"elementId": "5:741ce592-19fc-4a39-bf9c-6c38d568f0a5:0",
"type": "KNOWS",
"eventType": "r",
"operation": "c",
"start": {
"elementId": "4:741ce592-19fc-4a39-bf9c-6c38d568f0a5:0",
"keys": {
"Person": [
{
"first_name": "John",
"last_name": "Doe"
}
]
},
"labels": [
"Person"
]
},
"end": {
"elementId": "4:741ce592-19fc-4a39-bf9c-6c38d568f0a5:1",
"keys": {
"Person": [
{
"first_name": "Mary",
"last_name": "Doe"
}
]
},
"labels": [
"Person"
]
},
"keys": [],
"state": {
"before": null,
"after": {
"properties": {
"since": "2012-01-01"
}
}
}
}
}
{
"meta": {
"timestamp": 1532597182604,
"username": "neo4j",
"tx_id": 3,
"tx_event_id": 0,
"tx_events_count": 2,
"operation": "created",
"source": {
"hostname": "neo4j.example.com"
}
},
"payload": {
"id": "123",
"type": "relationship",
"label": "KNOWS",
"start": {
"labels": [
"Person"
],
"id": "123",
"ids": {
"last_name": "Doe",
"first_name": "John"
}
},
"end": {
"labels": [
"Person"
],
"id": "456",
"ids": {
"last_name": "Doe",
"first_name": "Mary"
}
},
"after": {
"properties": {
"since": "2012-01-01"
}
}
},
"schema": {
"properties": {
"since": "LocalDateTime"
},
"constraints": [
{
"label": "KNOWS",
"properties": [
"since"
],
"type": "RELATIONSHIP_PROPERTY_EXISTS"
}
]
}
}
关系如下所示持久化,Sink 连接器使用变更事件中起始和结束节点的 elementId
或 id
字段来创建或更新关系。
(:Person:SourceEvent {last_name: "Doe", first_name: "John", sourceId: "4:741ce592-19fc-4a39-bf9c-6c38d568f0a5:0"})-[:KNOWS {since: "2012-01-01", sourceId: "5:741ce592-19fc-4a39-bf9c-6c38d568f0a5:0"}]->(:Person:SourceEvent {last_name: "Doe", first_name: "Mary", sourceId: "4:741ce592-19fc-4a39-bf9c-6c38d568f0a5:1"})
(:Person:SourceEvent {last_name: "Doe", first_name: "John", sourceId: "123"})-[:KNOWS {since: "2012-01-01", sourceId: "123"}]->(:Person:SourceEvent {last_name: "Doe", first_name: "Mary", sourceId: "456"})
创建 Sink 实例
根据以上示例,您可以使用以下配置之一。选择一个消息序列化格式示例,并将其保存为名为 sink.cdc.source-id.neo4j.json
的文件到本地目录中。
{
"name": "Neo4jSinkConnectorAVRO",
"config": {
"topics": "topic.1,topic.2",
"connector.class": "org.neo4j.connectors.kafka.sink.Neo4jConnector",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"neo4j.uri": "neo4j://neo4j:7687",
"neo4j.authentication.type": "BASIC",
"neo4j.authentication.basic.username": "neo4j",
"neo4j.authentication.basic.password": "password",
"neo4j.cdc.source-id.topics": "topic.1,topic.2",
"neo4j.cdc.source-id.label-name": "SourceEvent",
"neo4j.cdc.source-id.property-name": "sourceId"
}
}
{
"name": "Neo4jSinkConnectorJSONSchema",
"config": {
"topics": "topic.1,topic.2",
"connector.class": "org.neo4j.connectors.kafka.sink.Neo4jConnector",
"key.converter": "io.confluent.connect.json.JsonSchemaConverter",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter": "io.confluent.connect.json.JsonSchemaConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"neo4j.uri": "neo4j://neo4j:7687",
"neo4j.authentication.type": "BASIC",
"neo4j.authentication.basic.username": "neo4j",
"neo4j.authentication.basic.password": "password",
"neo4j.cdc.source-id.topics": "topic.1,topic.2",
"neo4j.cdc.source-id.label-name": "SourceEvent",
"neo4j.cdc.source-id.property-name": "sourceId"
}
}
{
"name": "Neo4jSinkConnectorProtobuf",
"config": {
"topics": "topic.1,topic.2",
"connector.class": "org.neo4j.connectors.kafka.sink.Neo4jConnector",
"key.converter": "io.confluent.connect.protobuf.ProtobufConverter",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"key.converter.optional.for.nullables": true,
"value.converter": "io.confluent.connect.protobuf.ProtobufConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter.optional.for.nullables": true,
"neo4j.uri": "neo4j://neo4j:7687",
"neo4j.authentication.type": "BASIC",
"neo4j.authentication.basic.username": "neo4j",
"neo4j.authentication.basic.password": "password",
"neo4j.cdc.source-id.topics": "topic.1,topic.2",
"neo4j.cdc.source-id.label-name": "SourceEvent",
"neo4j.cdc.source-id.property-name": "sourceId"
}
}
使用此 REST 调用将配置加载到 Kafka Connect 中
curl -X POST https://:8083/connectors \
-H 'Content-Type:application/json' \
-H 'Accept:application/json' \
-d @sink.cdc.source-id.neo4j.json
现在您可以访问位于 https://:9021/clusters
的 Confluent Control Center 实例。验证已配置的连接器实例正在 Connect
选项卡的 connect-default
下运行。