模式策略
此策略允许您通过提取模式从 Kafka 消息中提取节点和关系。
配置
要为所需主题配置模式策略,您必须遵循以下约定
"neo4j.pattern.topic.<YOUR_TOPIC>": "<PATTERN>"
例如,给定以下发布到 user
和 lives_in
主题的消息
{"userId": 1, "name": "John", "surname": "Doe", "address": {"since": "2012-05", "city": "London", "country": "UK"}}
您可以通过提供以下配置将其转换为节点
"neo4j.pattern.topic.user": "(:User{!id: userId})"
您还可以提供关系模式将其转换为路径,例如 (n)-[r]→(m)
,方法是提供以下配置
"neo4j.pattern.topic.lives_in": "(:User{!id: userId})-[:LIVES_IN{since}]->(:City{!name: address.city, !country: address.country})"
关系键属性仅适用于 Neo4j 企业版 5.7 及更高版本,以及 AuraDB 5。 |
创建接收器实例
基于以上示例,您可以使用以下配置之一。选择其中一个消息序列化格式示例,将其另存为名为 sink.pattern.neo4j.json
的文件,并将其保存到本地目录中。
使用以下 REST 调用将配置加载到 Kafka Connect 中
curl -X POST http://localhost:8083/connectors \
-H 'Content-Type:application/json' \
-H 'Accept:application/json' \
-d @sink.pattern.neo4j.json
现在,您可以访问 Confluent 控制中心实例,地址为 http://localhost:9021/clusters
。验证已配置的连接器实例是否在 connect-default
下的 连接
选项卡中运行。
模式
节点模式
节点模式的定义类似于 Cypher 节点模式。
-
以
(
开头。 -
定义可选的标签列表,用
:
分隔,例如:Person
或:Person:Employee
。 -
使用
{
打开属性部分。 -
定义要作为键处理的属性,每个属性都以
!
开头,其中至少需要提供一个键属性。可以明确地引用单个消息字段,并将其分配给用户定义的属性,格式为userId: __key.user.id
。默认情况下,可以引用消息字段,如__timestamp
、__headers
、__key
和__value
。 -
要么;
-
无或
*
,表示将消息中的所有属性分配给节点。 -
要从消息分配给节点的属性名称列表。可以明确地引用单个消息字段,并将其分配给用户定义的属性,格式为
userName: __value.user.name
。默认情况下,可以引用消息字段,如__timestamp
、__headers
、__key
和__value
。 -
要从消息分配给节点的属性名称列表,每个属性都以
-
开头,消息中的所有其他属性都将分配给节点。
-
-
使用
}
关闭属性部分。 -
以
)
结尾。
您不能在模式中混合包含和排除,即,您的模式必须包含所有排除或包含属性。 |
示例
-
对
User
标签进行MERGE
操作,其中userId
被视为键,并将传入消息值中的所有属性分配给节点(:User{!userId})
或者
(:User{!userId, *})
-
对
User
标签进行MERGE
操作,其中userId
被视为键,并将传入消息中的 仅surname
属性分配给节点(:User{!userId, surname})
-
对
User
标签进行MERGE
操作,其中userId
被视为键,并将传入消息中的 仅surname
和address.city
属性分配给节点(:User{!userId, surname, city: address.city})
-
对
User
标签进行MERGE
操作,其中userId
被视为键,并将传入消息中 排除address
属性之外的所有属性分配给节点(:User{!userId, -address})
-
对
User
标签进行MERGE
操作,其中userId
被视为键,该键将从消息的键部分获取,并将传入消息的值部分中的 仅name
和surname
属性分配给节点(:User{!userId: __key.id, name: __value.firstName, surname: __value.lastName})
-
对
User
标签进行MERGE
操作,其中userId
被视为键,该键将从传入消息的键部分获取,并将值部分中的 仅name
、surname
属性、标题中的createdBy
和当前时间戳作为createdAt
属性分配给节点(:User{!userId: __key.id, name: __value.firstName, surname: __value.lastName, createdBy: __header.username, createdAt: __timestamp})
关系模式
关系模式的定义类似于 Cypher 关系模式。
-
起始节点的节点模式。
-
-[
-
定义关系类型,并在其前面加上
:
,例如:BOUGHT
或:KNOWS
。 -
使用
{
打开属性部分。 -
[可选] 定义要作为键处理的属性,每个属性都以
!
开头,其中至少需要提供一个键属性。可以明确地引用单个消息字段,并将其分配给用户定义的属性,格式为relationshipId: __key.relationship.id
。默认情况下,可以引用消息字段,如__timestamp
、__headers
、__key
和__value
。 -
要么;
-
无或
*
,表示将消息中的所有属性分配给关系。 -
要从消息分配给关系的属性名称列表。可以明确地引用单个消息字段,并将其分配给用户定义的属性,格式为
relationshipType: __value.relationship.type
。默认情况下,可以引用消息字段,如__timestamp
、__headers
、__key
和__value
。 -
要从消息分配给关系的属性名称列表,每个属性都以
-
开头,消息中的所有其他属性都将分配给关系。
-
-
使用
}
关闭属性部分。 -
]->
-
结束节点的节点模式。
您不能在模式中混合包含和排除,即,您的模式必须包含所有排除或包含属性。 |
示例
-
对
User
和Product
标签进行MERGE
操作,其中userId
和productId
分别被视为键,并MERGE
它们之间的一个BOUGHT
关系,并将其他所有消息属性分配给关系(:User{!userId})-[:BOUGHT]->(:Product{!productId})
-
对
User
和Product
标签进行MERGE
操作,其中userId
和productId
分别被视为键,并MERGE
它们之间的一个BOUGHT
关系,并将传入消息中的 仅price
和currency
属性分配给关系(:User{!userId})-[:BOUGHT{price,currency}]->(:Product{!productId})
-
对
User
和Product
标签进行MERGE
操作,其中userId
和productId
分别被视为键,并MERGE
它们之间的一个BOUGHT
关系,并将传入消息中的 仅price
、currency
和shippingAddress.city
属性分配给关系(:User{!userId})-[:BOUGHT{price,currency,shippingAddress.city}]->(:Product{!productId})
-
对
User
和Product
标签进行MERGE
操作,其中userId
和productId
分别被视为键,并MERGE
它们之间的一个BOUGHT
关系,并将传入消息中 排除shippingAddress
属性之外的所有属性分配给关系(:User{!userId})-[:BOUGHT{-shippingAddress}]->(:Product{!productId})
-
对
User
和Product
标签进行MERGE
操作,其中userId
和productId
分别被视为键,并将消息中的userFirstName
和userLastName
属性分配给User
节点,并MERGE
它们之间的一个BOUGHT
关系,并将消息中的 仅price
和currency
属性分配给关系(:User{!userId, userFirstName, userLastName})-[:BOUGHT{price, currency}]->(:Product{!productId})
-
对
User
和Product
标签进行MERGE
操作,其中userId
和productId
分别被视为键,并MERGE
它们之间的一个BOUGHT
关系,并将消息的键部分中的transactionId
属性作为键属性,并将消息的值部分中的date
分配给关系(:User{!userId})-[:BOUGHT{!transactionId: __key.transaction.id, date: __value.transaction.date}]->(:Product{!productId})
墓碑记录
模式策略支持墓碑记录。为了使用它,消息键至少应包含提供的模式中存在的键属性,并且消息值应设置为null
。
无法为单个主题定义多个模式,例如从单个消息中提取多个节点或关系类型。为了实现这一点,您必须为每个模式使用不同的主题。 |