模式策略
此策略允许您通过提取模式从 Kafka 消息中提取节点和关系。
配置
要为所需主题配置模式策略,您必须遵循以下约定
"neo4j.pattern.topic.<YOUR_TOPIC>": "<PATTERN>"
例如,给定发布到 `user` 和 `lives_in` 主题的以下消息
{"userId": 1, "name": "John", "surname": "Doe", "address": {"since": "2012-05", "city": "London", "country": "UK"}}
您可以通过提供以下配置将其转换为一个节点
"neo4j.pattern.topic.user": "(:User{!id: userId})"
您还可以提供关系模式,通过提供以下配置将其转换为路径,例如 `(n)-[r]→(m)`
"neo4j.pattern.topic.lives_in": "(:User{!id: userId})-[:LIVES_IN{since}]->(:City{!name: address.city, !country: address.country})"
关系键属性只能与 Neo4j 企业版 5.7 及更高版本以及 AuraDB 5 一起使用。 |
创建 Sink 实例
根据以上示例,您可以使用以下配置之一。选择其中一个消息序列化格式示例,并将其保存为名为 `sink.pattern.neo4j.json` 的文件到本地目录中。
使用此 REST 调用将配置加载到 Kafka Connect 中
curl -X POST http://localhost:8083/connectors \
-H 'Content-Type:application/json' \
-H 'Accept:application/json' \
-d @sink.pattern.neo4j.json
现在,您可以在 `http://localhost:9021/clusters` 下访问您的 Confluent Control Center 实例。验证配置的连接器实例是否在 `connect-default` 下的 `Connect` 选项卡中运行。
模式
节点模式
节点模式的定义类似于 Cypher 节点模式。
-
以 `(` 开始。
-
定义可选的标签列表,用 `:` 分隔,例如 `:Person` 或 `:Person:Employee`。
-
用 `{` 打开属性部分。
-
定义要用作键的属性,每个属性前缀为 `!`,且必须至少提供一个键属性。单个消息字段可以显式引用并以 `userId: __key.user.id` 的格式分配给用户定义的属性。默认情况下,消息字段可以引用为 `__timestamp`、`__headers`、`__key` 和 `__value`。
-
以下二选一:
-
无,或 `*`,表示将消息中的所有属性分配给节点。
-
要从消息中分配给节点的属性名称列表。单个消息字段可以显式引用并以 `userName: __value.user.name` 的格式分配给用户定义的属性。默认情况下,消息字段可以引用为 `__timestamp`、`__headers`、`__key` 和 `__value`。
-
不分配给节点的属性名称列表,每个名称前缀为 `-`,消息中的所有其他属性都将分配给节点。
-
-
用 `}` 关闭属性部分。
-
以 `)` 结束。
您不能在模式中混合包含和排除,即您的模式必须包含所有排除属性或所有包含属性。 |
示例
-
对 `User` 标签执行 `MERGE` 操作,其中 `userId` 被视为键,并将传入消息值中的所有属性分配给节点
(:User{!userId})
或
(:User{!userId, *})
-
对 `User` 标签执行 `MERGE` 操作,其中 `userId` 被视为键,并**仅**将传入消息中的 `surname` 属性分配给节点
(:User{!userId, surname})
-
对 `User` 标签执行 `MERGE` 操作,其中 `userId` 被视为键,并**仅**将传入消息中的 `surname` 和 `address.city` 属性分配给节点
(:User{!userId, surname, city: address.city})
-
对 `User` 标签执行 `MERGE` 操作,其中 `userId` 被视为键,并将传入消息中除 `address` 属性之外的所有属性分配给节点
(:User{!userId, -address})
-
对 `User` 标签执行 `MERGE` 操作,其中 `userId` 被视为键,该键将从消息的键部分获取,并**仅**将传入消息的值部分中的 `name` 和 `surname` 属性分配给节点
(:User{!userId: __key.id, name: __value.firstName, surname: __value.lastName})
-
对 `User` 标签执行 `MERGE` 操作,其中 `userId` 被视为键,该键将从传入消息的键部分获取,并**仅**将值部分中的 `name` 和 `surname` 属性、来自头部信息的 `createdBy` 以及作为当前时间戳的 `createdAt` 属性分配给节点
(:User{!userId: __key.id, name: __value.firstName, surname: __value.lastName, createdBy: __header.username, createdAt: __timestamp})
关系模式
关系模式的定义类似于 Cypher 关系模式。
-
起始节点的节点模式。
-
-[
-
定义关系类型,前缀为 `:`,例如 `:BOUGHT` 或 `:KNOWS`。
-
用 `{` 打开属性部分。
-
[可选] 定义要用作键的属性,每个属性前缀为 `!`,且必须至少提供一个键属性。单个消息字段可以显式引用并以 `relationshipId: __key.relationship.id` 的格式分配给用户定义的属性。默认情况下,消息字段可以引用为 `__timestamp`、`__headers`、`__key` 和 `__value`。
-
以下二选一:
-
无,或 `*`,表示将消息中的所有属性分配给关系。
-
要从消息中分配给关系的属性名称列表。单个消息字段可以显式引用并以 `relationshipType: __value.relationship.type` 的格式分配给用户定义的属性。默认情况下,消息字段可以引用为 `__timestamp`、`__headers`、`__key` 和 `__value`。
-
不分配给关系的属性名称列表,每个名称前缀为 `-`,消息中的所有其他属性都将分配给关系。
-
-
用 `}` 关闭属性部分。
-
]->
-
结束节点的节点模式。
您不能在模式中混合包含和排除,即您的模式必须包含所有排除属性或所有包含属性。 |
示例
-
对 `User` 和 `Product` 标签执行 `MERGE` 操作,其中 `userId` 和 `productId` 分别被视为键,并在它们之间 `MERGE` 一个 `BOUGHT` 关系,将所有其他消息属性分配给该关系
(:User{!userId})-[:BOUGHT]->(:Product{!productId})
-
对 `User` 和 `Product` 标签执行 `MERGE` 操作,其中 `userId` 和 `productId` 分别被视为键,并在它们之间 `MERGE` 一个 `BOUGHT` 关系,并**仅**将传入消息中的 `price` 和 `currency` 属性分配给该关系
(:User{!userId})-[:BOUGHT{price,currency}]->(:Product{!productId})
-
对 `User` 和 `Product` 标签执行 `MERGE` 操作,其中 `userId` 和 `productId` 分别被视为键,并在它们之间 `MERGE` 一个 `BOUGHT` 关系,并**仅**将传入消息中的 `price`、`currency` 和 `shippingAddress.city` 属性分配给该关系
(:User{!userId})-[:BOUGHT{price,currency,shippingAddress.city}]->(:Product{!productId})
-
对 `User` 和 `Product` 标签执行 `MERGE` 操作,其中 `userId` 和 `productId` 分别被视为键,并在它们之间 `MERGE` 一个 `BOUGHT` 关系,将传入消息中**除** `shippingAddress` 之外的所有属性分配给该关系
(:User{!userId})-[:BOUGHT{-shippingAddress}]->(:Product{!productId})
-
对 `User` 和 `Product` 标签执行 `MERGE` 操作,其中 `userId` 和 `productId` 分别被视为键,将消息中的 `userFirstName` 和 `userLastName` 属性分配给 `User` 节点,并在它们之间 `MERGE` 一个 `BOUGHT` 关系,**仅**将消息中的 `price` 和 `currency` 属性分配给该关系
(:User{!userId, userFirstName, userLastName})-[:BOUGHT{price, currency}]->(:Product{!productId})
-
对 `User` 和 `Product` 标签执行 `MERGE` 操作,其中 `userId` 和 `productId` 分别被视为键,并在它们之间 `MERGE` 一个 `BOUGHT` 关系,**仅**将消息键部分中的 `transactionId` 属性作为键属性,并将消息值部分中的 `date` 分配给该关系
(:User{!userId})-[:BOUGHT{!transactionId: __key.transaction.id, date: __value.transaction.date}]->(:Product{!productId})
Tombstone 记录
模式策略支持tombstone 记录。为了使用它,消息键应至少包含所提供模式中存在的键属性,并且消息值应设置为 `null`。
无法为一个主题定义多个模式,例如从单个消息中提取多个节点或关系类型。为了实现这一点,您必须为每个模式使用不同的主题。 |