模式策略

此策略允许您通过提取模式从 Kafka 消息中提取节点和关系。

配置

要为所需主题配置模式策略,您必须遵循以下约定

"neo4j.pattern.topic.<YOUR_TOPIC>": "<PATTERN>"

例如,给定以下发布到 userlives_in 主题的消息

{"userId": 1, "name": "John", "surname": "Doe", "address": {"since": "2012-05", "city": "London", "country": "UK"}}

您可以通过提供以下配置将其转换为节点

"neo4j.pattern.topic.user": "(:User{!id: userId})"

您还可以提供关系模式将其转换为路径,例如 (n)-[r]→(m),方法是提供以下配置

"neo4j.pattern.topic.lives_in": "(:User{!id: userId})-[:LIVES_IN{since}]->(:City{!name: address.city, !country: address.country})"
关系键属性仅适用于 Neo4j 企业版 5.7 及更高版本,以及 AuraDB 5。

创建接收器实例

基于以上示例,您可以使用以下配置之一。选择其中一个消息序列化格式示例,将其另存为名为 sink.pattern.neo4j.json 的文件,并将其保存到本地目录中。

{
  "name": "Neo4jSinkConnectorAVRO",
  "config": {
    "topics": "user,lives-in",
    "connector.class": "org.neo4j.connectors.kafka.sink.Neo4jConnector",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "neo4j.uri": "neo4j://neo4j:7687",
    "neo4j.authentication.type": "BASIC",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "password",
    "neo4j.pattern.topic.user": "(:User{!id: userId})",
    "neo4j.pattern.topic.lives-in": "(:User{!id: userId})-[:LIVES_IN{since}]->(:City{!name: address.city, !country: address.country})"
  }
}
{
  "name": "Neo4jSinkConnectorJSONSchema",
  "config": {
    "topics": "user,lives-in",
    "connector.class": "org.neo4j.connectors.kafka.sink.Neo4jConnector",
    "key.converter": "io.confluent.connect.json.JsonSchemaConverter",
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter": "io.confluent.connect.json.JsonSchemaConverter",
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "neo4j.uri": "neo4j://neo4j:7687",
    "neo4j.authentication.type": "BASIC",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "password",
    "neo4j.pattern.topic.user": "(:User{!id: userId})",
    "neo4j.pattern.topic.lives-in": "(:User{!id: userId})-[:LIVES_IN{since}]->(:City{!name: address.city, !country: address.country})"
  }
}
{
  "name": "Neo4jSinkConnectorProtobuf",
  "config": {
    "topics": "user,lives-in",
    "connector.class": "org.neo4j.connectors.kafka.sink.Neo4jConnector",
    "key.converter": "io.confluent.connect.protobuf.ProtobufConverter",
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "key.converter.optional.for.nullables": true,
    "value.converter": "io.confluent.connect.protobuf.ProtobufConverter",
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter.optional.for.nullables": true,
    "neo4j.uri": "neo4j://neo4j:7687",
    "neo4j.authentication.type": "BASIC",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "password",
    "neo4j.pattern.topic.user": "(:User{!id: userId})",
    "neo4j.pattern.topic.lives-in": "(:User{!id: userId})-[:LIVES_IN{since}]->(:City{!name: address.city, !country: address.country})"
  }
}

使用以下 REST 调用将配置加载到 Kafka Connect 中

curl -X POST http://localhost:8083/connectors \
  -H 'Content-Type:application/json' \
  -H 'Accept:application/json' \
  -d @sink.pattern.neo4j.json

现在,您可以访问 Confluent 控制中心实例,地址为 http://localhost:9021/clusters。验证已配置的连接器实例是否在 connect-default 下的 连接 选项卡中运行。

模式

节点模式

节点模式的定义类似于 Cypher 节点模式。

  1. ( 开头。

  2. 定义可选的标签列表,用 : 分隔,例如 :Person:Person:Employee

  3. 使用 { 打开属性部分。

  4. 定义要作为键处理的属性,每个属性都以 ! 开头,其中至少需要提供一个键属性。可以明确地引用单个消息字段,并将其分配给用户定义的属性,格式为 userId: __key.user.id。默认情况下,可以引用消息字段,如 __timestamp__headers__key__value

  5. 要么;

    1. 无或 *,表示将消息中的所有属性分配给节点。

    2. 要从消息分配给节点的属性名称列表。可以明确地引用单个消息字段,并将其分配给用户定义的属性,格式为 userName: __value.user.name。默认情况下,可以引用消息字段,如 __timestamp__headers__key__value

    3. 要从消息分配给节点的属性名称列表,每个属性都以 - 开头,消息中的所有其他属性都将分配给节点。

  6. 使用 } 关闭属性部分。

  7. ) 结尾。

您不能在模式中混合包含和排除,即,您的模式必须包含所有排除或包含属性。

示例

  • User 标签进行 MERGE 操作,其中 userId 被视为键,并将传入消息值中的所有属性分配给节点

    (:User{!userId})

    或者

    (:User{!userId, *})
  • User 标签进行 MERGE 操作,其中 userId 被视为键,并将传入消息中的 surname 属性分配给节点

    (:User{!userId, surname})
  • User 标签进行 MERGE 操作,其中 userId 被视为键,并将传入消息中的 surnameaddress.city 属性分配给节点

    (:User{!userId, surname, city: address.city})
  • User 标签进行 MERGE 操作,其中 userId 被视为键,并将传入消息中 排除 address 属性之外的所有属性分配给节点

    (:User{!userId, -address})
  • User 标签进行 MERGE 操作,其中 userId 被视为键,该键将从消息的键部分获取,并将传入消息的值部分中的 namesurname 属性分配给节点

    (:User{!userId: __key.id, name: __value.firstName, surname: __value.lastName})
  • User 标签进行 MERGE 操作,其中 userId 被视为键,该键将从传入消息的键部分获取,并将值部分中的 namesurname 属性、标题中的 createdBy 和当前时间戳作为 createdAt 属性分配给节点

    (:User{!userId: __key.id, name: __value.firstName, surname: __value.lastName, createdBy: __header.username, createdAt: __timestamp})

关系模式

关系模式的定义类似于 Cypher 关系模式。

  1. 起始节点的节点模式。

  2. -[

  3. 定义关系类型,并在其前面加上 :,例如 :BOUGHT:KNOWS

  4. 使用 { 打开属性部分。

  5. [可选] 定义要作为键处理的属性,每个属性都以 ! 开头,其中至少需要提供一个键属性。可以明确地引用单个消息字段,并将其分配给用户定义的属性,格式为 relationshipId: __key.relationship.id。默认情况下,可以引用消息字段,如 __timestamp__headers__key__value

  6. 要么;

    1. 无或 *,表示将消息中的所有属性分配给关系。

    2. 要从消息分配给关系的属性名称列表。可以明确地引用单个消息字段,并将其分配给用户定义的属性,格式为 relationshipType: __value.relationship.type。默认情况下,可以引用消息字段,如 __timestamp__headers__key__value

    3. 要从消息分配给关系的属性名称列表,每个属性都以 - 开头,消息中的所有其他属性都将分配给关系。

  7. 使用 } 关闭属性部分。

  8. ]->

  9. 结束节点的节点模式。

您不能在模式中混合包含和排除,即,您的模式必须包含所有排除或包含属性。

示例

  1. UserProduct 标签进行 MERGE 操作,其中 userIdproductId 分别被视为键,并 MERGE 它们之间的一个 BOUGHT 关系,并将其他所有消息属性分配给关系

    (:User{!userId})-[:BOUGHT]->(:Product{!productId})
  2. UserProduct 标签进行 MERGE 操作,其中 userIdproductId 分别被视为键,并 MERGE 它们之间的一个 BOUGHT 关系,并将传入消息中的 pricecurrency 属性分配给关系

    (:User{!userId})-[:BOUGHT{price,currency}]->(:Product{!productId})
  3. UserProduct 标签进行 MERGE 操作,其中 userIdproductId 分别被视为键,并 MERGE 它们之间的一个 BOUGHT 关系,并将传入消息中的 pricecurrencyshippingAddress.city 属性分配给关系

    (:User{!userId})-[:BOUGHT{price,currency,shippingAddress.city}]->(:Product{!productId})
  4. UserProduct 标签进行 MERGE 操作,其中 userIdproductId 分别被视为键,并 MERGE 它们之间的一个 BOUGHT 关系,并将传入消息中 排除 shippingAddress 属性之外的所有属性分配给关系

    (:User{!userId})-[:BOUGHT{-shippingAddress}]->(:Product{!productId})
  5. UserProduct 标签进行 MERGE 操作,其中 userIdproductId 分别被视为键,并将消息中的 userFirstNameuserLastName 属性分配给 User 节点,并 MERGE 它们之间的一个 BOUGHT 关系,并将消息中的 pricecurrency 属性分配给关系

    (:User{!userId, userFirstName, userLastName})-[:BOUGHT{price, currency}]->(:Product{!productId})
  6. UserProduct 标签进行 MERGE 操作,其中 userIdproductId 分别被视为键,并 MERGE 它们之间的一个 BOUGHT 关系,并将消息的键部分中的 transactionId 属性作为键属性,并将消息的值部分中的 date 分配给关系

    (:User{!userId})-[:BOUGHT{!transactionId: __key.transaction.id, date: __value.transaction.date}]->(:Product{!productId})

墓碑记录

模式策略支持墓碑记录。为了使用它,消息键至少应包含提供的模式中存在的键属性,并且消息值应设置为null

无法为单个主题定义多个模式,例如从单个消息中提取多个节点或关系类型。为了实现这一点,您必须为每个模式使用不同的主题。