数据变更捕获策略
数据变更捕获策略利用了 Neo4j 和 Aura Enterprise 5 提供的数据变更捕获功能,并且是源连接器实例的首选策略,因为它不需要任何模式更改并且可以可靠地捕获删除操作。请确保在使用此策略配置源实例之前,已完成数据变更捕获 > 入门中描述的必要步骤。
为了配置此策略,您需要定义模式和选择器,这些模式和选择器描述了您希望跟踪哪些节点或关系的更改并将它们分配给主题。
当数据库从本地安装中的备份恢复或从快照恢复,或在 Neo4j Aura 中暂停和恢复时,现有的更改标识符将不再起作用,您需要从头开始重新配置源实例。有关更多信息,请参阅数据变更捕获 > 恢复备份和快照。 |
配置
首先,您需要为连接器实例选择 CDC 策略;
"neo4j.source-strategy": "CDC"
其次,您需要定义您的模式并将它们映射到您的主题;
"neo4j.cdc.topic.my-topic.patterns": "(:Person),(:Person)-[:KNOWS]-(:Person)"
虽然以上配置是为了方便起见,但如果您需要为更改事件定义其他过滤器,例如操作、更改的属性名称或元数据字段,则需要使用如下所示的索引配置方法;
"neo4j.cdc.topic.my-topic.patterns.0.pattern": "(:Person)", (1)
"neo4j.cdc.topic.my-topic.patterns.0.operation": "create", (2)
"neo4j.cdc.topic.my-topic.patterns.0.changesTo": "name,surname", (3)
"neo4j.cdc.topic.my-topic.patterns.0.metadata.authenticatedUser": "neo4j", (4)
"neo4j.cdc.topic.my-topic.patterns.0.metadata.executingUser": "neo4j", (5)
"neo4j.cdc.topic.my-topic.patterns.0.metadata.txMetadata.app": "sales", (6)
"neo4j.cdc.topic.my-topic.patterns.1.pattern": "(:Person)-[:KNOWS]->(:Person)",
"neo4j.cdc.topic.my-topic.patterns.1.operation": "update",
"neo4j.cdc.topic.my-topic.patterns.1.changesTo": "since",
"neo4j.cdc.topic.my-topic.patterns.1.metadata.authenticatedUser": "neo4j",
"neo4j.cdc.topic.my-topic.patterns.1.metadata.executingUser": "neo4j",
"neo4j.cdc.topic.my-topic.patterns.1.metadata.txMetadata.app": "sales"
1 | 一个识别图实体以监视更改的模式。 |
2 | 我们感兴趣的单个操作,可以是create 、update 或delete 。 |
3 | 需要更新的属性列表,以便返回更改消息。属性的添加和删除也计为更新。 |
4 | 执行更改的已认证用户。 |
5 | 执行更改的执行用户。通常与已认证用户相同,但如果使用了模拟,则可能不同。 |
6 | 需要与执行更改的事务的事务元数据匹配的键值对。 |
在以上示例中,只有pattern 设置是强制性的,其他设置是可选的,可以根据您的需求添加。 |
创建源实例
根据以上示例,您可以使用以下配置之一。选择消息序列化格式示例之一,并将其另存为名为source.cdc.neo4j.json
的文件到本地目录。
现在,我们将通过调用以下 REST 调用来创建源实例
curl -X POST http://localhost:8083/connectors \
-H "Content-Type:application/json" \
-H "Accept:application/json" \
-d @source.cdc.neo4j.json
这将创建一个 Kafka Connect 源实例,该实例将使用您首选的序列化格式,将与提供的选择器匹配的更改事件消息发送到my-topic
主题。在 Control Center 中,确认源连接器已在 Connect 选项卡(在 connect-default 下)中创建。
模式
节点模式
节点模式类似于 Cypher 节点模式进行定义。
-
以
(
开头。 -
[可选] 定义可选的标签列表,用
:
分隔,例如:Person
或:Person:Employee
。 -
[可选] 使用
{
打开属性部分。-
[可选] 定义用作键过滤器的属性及其值,格式为
key: value
。可以定义多个属性,并且必须用,
分隔。这些属性必须对应于节点键约束属性。 -
或者;
-
无或
*
,表示将 JSON 对象中的所有属性分配给节点。 -
用
,
分隔的属性名称列表,要从 JSON 对象分配给节点。 -
用
-
前缀表示的属性名称列表,表示不要分配给节点,JSON 对象中的所有其他属性都将分配给节点。
-
-
使用
}
关闭属性部分。
-
-
以
)
结尾。
您不能混合包含和排除属性,因此您的模式必须包含所有排除属性或包含属性。 |
示例
-
选择任何节点上的所有更改。
()
-
选择标签为
:User
的节点上的所有更改。(:User)
-
选择同时具有
:User
和:Employee
标签的节点上的所有更改。(:User:Employee)
-
选择标签为
:User
的节点上的所有更改,并且仅在更改事件中包含name
和surname
属性。(:User{name, surname})
-
选择标签为
:User
的节点上的所有更改,并在更改事件中排除adress
和dob
属性。(:User{-address, -dob})
-
选择标签为
:User
且键属性userId
等于1001
的节点上的所有更改,并在更改事件中包含name
和surname
属性。(:User{userId: 1001, name, surname})
此示例需要在 :User
标签的userId
属性上设置节点键约束。 -
选择同时具有
:User
和Employee
标签的节点上的所有更改,以及键属性name
等于john
和surname
等于doe
的节点上的所有更改。(:User:Employee{name: 'john', surname: 'doe'})
此示例需要在 :User
或:Employee
标签或两者上都设置name
和surname
属性的节点键约束。
关系模式
关系模式类似于 Cypher 关系模式进行定义。
-
开始节点的节点模式,没有任何属性包含或排除列表。
-
-[
. -
定义关系类型,以
:
开头,例如:BOUGHT
或:KNOWS
。 -
[可选] 使用
{
打开属性部分。-
[可选] 定义用作键过滤器的属性及其值,格式为
key: value
。 -
或者;
-
无或
*
,表示将 JSON 对象中的所有属性分配给节点。 -
用
,
分隔的属性名称列表,要从 JSON 对象分配给节点。 -
用
-
前缀表示的属性名称列表,表示不要分配给节点,JSON 对象中的所有其他属性都将分配给节点。
-
-
使用
}
关闭属性部分。
-
-
]->
. -
结束节点的节点模式,没有任何属性包含或排除列表。
您不能混合包含和排除,因此您的模式必须包含所有排除属性或包含属性。 |
示例
-
选择
:BOUGHT
关系上的所有更改。()-[:BOUGHT]->()
-
选择标签为
:User
的开始节点和标签为:Product
的结束节点的:BOUGHT
关系上的所有更改。(:User)-[:BOUGHT]->(:Product)
-
选择标签为
:User
和:Employee
的开始节点和标签为:Product
的结束节点的:BOUGHT
关系上的所有更改。(:User:Employee)-[:BOUGHT]->(:Product)
-
选择标签为
:User
的开始节点和标签为:Product
的结束节点的:BOUGHT
关系上的所有更改,并且仅在更改事件中包含price
和currency
属性。(:User)-[:BOUGHT{price, currency}]->(:Product)
-
选择标签为
:User
的开始节点和标签为:Product
的结束节点的:BOUGHT
关系上的所有更改,并从更改事件中排除card
属性。(:User)-[:BOUGHT{-card}]->(:Product)
-
选择由键属性
contractId
等于5910
标识的:WORKS_FOR
关系上的所有更改。()-[:WORKS_FOR{contractId: 5910}]->()
此示例需要在 :WORKS_FOR
关系类型的contractId
属性上设置关系键约束。 -
选择由键属性
contractId
等于5910
标识的:WORKS_FOR
关系上的所有更改,并从更改事件中排除salary
属性。()-[:WORKS_FOR{contractId: 5910,-salary}]->()
-
选择从标签为
:User
且键属性userId
等于1001
的节点开始的关系上的所有更改。(:User{userId: 1001})-[]->()