CUD 文件格式策略

CUD 文件格式是一个 JSON 文件,表示对节点或关系执行的操作(**C**reate、**U**pdate、**D**elete)。

配置

您可以按以下方式配置主题

"neo4j.cud.topics": "<COMMA_SEPARATED_LIST_OF_TOPICS>"

例如,如果您有两个主题,topic.1topic.2,您以 CUD 格式的消息发布它们,您可以如下配置接收器实例。

"neo4j.cud.topics": "topic.1,topic.2"

创建接收器实例

根据以上示例,您可以使用以下配置之一。选择其中一个消息序列化格式示例,并将其另存为名为 sink.cud.neo4j.json 的文件到本地目录。

{
  "name": "Neo4jSinkConnectorAVRO",
  "config": {
    "topics": "topic.1,topic.2",
    "connector.class": "org.neo4j.connectors.kafka.sink.Neo4jConnector",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "neo4j.uri": "neo4j://neo4j:7687",
    "neo4j.authentication.type": "BASIC",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "password",
    "neo4j.cud.topics": "topic.1,topic.2"
  }
}
{
  "name": "Neo4jSinkConnectorJSONSchema",
  "config": {
    "topics": "topic.1,topic.2",
    "connector.class": "org.neo4j.connectors.kafka.sink.Neo4jConnector",
    "key.converter": "io.confluent.connect.json.JsonSchemaConverter",
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter": "io.confluent.connect.json.JsonSchemaConverter",
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "neo4j.uri": "neo4j://neo4j:7687",
    "neo4j.authentication.type": "BASIC",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "password",
    "neo4j.cud.topics": "topic.1,topic.2"
  }
}
{
  "name": "Neo4jSinkConnectorProtobuf",
  "config": {
    "topics": "topic.1,topic.2",
    "connector.class": "org.neo4j.connectors.kafka.sink.Neo4jConnector",
    "key.converter": "io.confluent.connect.protobuf.ProtobufConverter",
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "key.converter.optional.for.nullables": true,
    "value.converter": "io.confluent.connect.protobuf.ProtobufConverter",
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter.optional.for.nullables": true,
    "neo4j.uri": "neo4j://neo4j:7687",
    "neo4j.authentication.type": "BASIC",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "password",
    "neo4j.cud.topics": "topic.1,topic.2"
  }
}

使用此 REST 调用将配置加载到 Kafka Connect 中

curl -X POST http://localhost:8083/connectors \
  -H 'Content-Type:application/json' \
  -H 'Accept:application/json' \
  -d @sink.cud.neo4j.json

现在,您可以通过 http://localhost:9021/clusters 访问您的 Confluent Control Center 实例。验证配置的连接器实例是否在 connect-default 下的 Connect 选项卡中运行。

CUD 格式规范

我们有两种格式

  • 节点

  • 关系

节点

节点格式定义了一组字段,这些字段描述了节点实体的操作(createupdatemergedelete)、标签和属性。

表 1. **节点格式** 字段
字段 必填 描述

op

必填

操作类型。createmergeupdatedelete 之一。

delete 操作仅适用于**单个节点**。它并非旨在从 JSON 运行通用 Cypher 查询。

properties

可选 当操作为 delete 时。

附加到节点的属性。

ids

可选 当操作为 create 时。

包含将用于查找实体的主/唯一键属性。如果您使用 _elementId(或 _id)作为属性名称,它将使用 Neo4j 的内部元素 ID 进行节点查找。

如果您将 _elementId(或 _id)与 merge 操作一起使用,它将表现为简单的更新,即如果不存在具有给定元素 ID 的节点,则不会创建它。

labels

可选

附加到节点的标签。

尽管 Neo4j 允许创建没有标签的节点,但从性能角度来看不建议这样做。

type

必填

实体类型:node

detach

可选

当操作为 delete 时,您可以指定是否执行 "detach" 删除

如果未提供值,则默认为 false

示例

  • 一个 CREATE 操作的示例;

    {
      "type": "node",
      "op": "create",
      "labels": ["Foo", "Bar"],
      "properties": {
        "id": 1,
        "foo": "foo-value"
      }
    }

    它将转换为以下 Cypher 查询

    CREATE (n:Foo:Bar) SET n = $properties
  • 一个 UPDATE 操作的示例;

    {
      "type": "node",
      "op": "update",
      "labels": ["Foo", "Bar"],
      "ids": {
        "id": 0
      },
      "properties": {
        "id": 1,
        "foo": "foo-value"
      }
    }

    它将转换为以下 Cypher 查询

    MATCH (n:Foo:Bar {id: $ids.id}) SET n += $properties
  • 一个 MERGE 操作的示例;

    {
      "type": "node",
      "op": "merge",
      "labels": ["Foo", "Bar"],
      "ids": {
        "id": 0
      },
      "properties": {
        "id": 1,
        "foo": "foo-value"
      }
    }

    它将转换为以下 Cypher 查询

    MERGE (n:Foo:Bar {id: $ids.id}) SET n += $properties
  • 一个 DELETE 操作的示例;

    {
      "type": "NODE",
      "op": "delete",
      "labels": ["Foo", "Bar"],
      "ids": {
        "id": 0
      }
    }

    它将转换为以下 Cypher 查询

    MATCH (n:Foo:Bar {id: $ids.id}) DELETE n
  • 一个 DELETE 操作的示例,其中 detach 为 true

    {
      "type": "NODE",
      "op": "delete",
      "labels": ["Foo", "Bar"],
      "ids": {
        "id": 0
      },
      "detach": true
    }

    它将转换为以下 Cypher 查询

    MATCH (n:Foo:Bar {id: $ids.id}) DETACH DELETE n

关系

关系格式定义了一组字段,这些字段描述了操作(createupdatemergedelete)、关系类型、源和目标节点引用以及属性。

表 2. **关系格式** 字段
字段 必填 描述

op

必填

操作类型。createmergeupdatedelete 之一。

properties

可选 当操作为 delete 时。

附加到关系的属性。

rel_type

必填

关系类型。

ids

可选

包含将用于查找关系的主/唯一键属性。

from

必填

包含有关关系源节点的信息。op 字段只能为 mergematch,默认值为 match。有关 idslabels 字段的描述,请参阅 上面 描述的节点字段。

如果您在 ids 中使用 _elementId(或 _id)字段引用,则可以省略 labels 字段。

to

必填

包含有关关系目标节点的信息。op 字段只能为 mergematch,默认值为 match。有关 idslabels 字段的描述,请参阅 上面 描述的节点字段。

如果您在 ids 中使用 _elementId(或 _id)字段引用,则可以省略 labels 字段。

type

必填

实体类型:relationship

示例

  • 一个 CREATE 操作的示例;

    {
      "type": "relationship",
      "op": "create",
      "rel_type": "RELATED_TO",
      "from": {
        "labels": ["Foo"],
        "ids": {
          "id": 0
        }
      },
      "to": {
        "labels": ["Bar"],
        "ids": {
          "id": 1
        }
      },
      "properties": {
        "by": "incident"
      }
    }

    它将转换为以下 Cypher 查询

    MATCH (start:Foo {id: $from.ids.id}) WITH start
    MATCH (end:Bar {id: $to.ids.id}) WITH start, end
    CREATE (start)-[r:RELATED_TO]->(end)
    SET r = $properties
  • 一个 CREATE 操作的示例,其中 合并 源节点;

    {
      "type": "relationship",
      "op": "create",
      "rel_type": "RELATED_TO",
      "from": {
        "labels": ["Foo"],
        "ids": {
          "id": 0
        },
        "op": "merge"
      },
      "to": {
        "labels": ["Bar"],
        "ids": {
          "id": 1
        },
        "op": "match"
      },
      "properties": {
        "by": "incident"
      }
    }

    它将转换为以下 Cypher 查询

    MERGE (start:Foo {id: $from.ids.id}) WITH start
    MATCH (end:Bar {id: $to.ids.id}) WITH start, end
    CREATE (start)-[r:RELATED_TO]->(end)
    SET r = $properties
  • 一个 UPDATE 操作的示例;

    {
      "type": "relationship",
      "op": "update",
      "rel_type": "RELATED_TO",
      "from": {
        "labels": ["Foo"],
        "ids": {
          "id": 0
        }
      },
      "to": {
        "labels": ["Bar"],
        "ids": {
          "id": 1
        },
        "op": "merge"
      },
      "properties": {
        "by": "incident"
      }
    }

    它将转换为以下 Cypher 查询

    MATCH (start:Foo {id: $from.ids.id}) WITH start
    MERGE (end:Bar {id: $to.ids.id}) WITH start, end
    MATCH (start)-[r:RELATED_TO]->(end)
    SET r += $properties
  • 一个 UPDATE 操作的示例,其中包含 关系 ID

    {
      "type": "relationship",
      "op": "update",
      "rel_type": "RELATED_TO",
      "from": {
        "labels": ["Foo"],
        "ids": {
          "id": 0
        }
      },
      "to": {
        "labels": ["Bar"],
        "ids": {
          "id": 1
        },
        "op": "merge"
      },
      "ids": {
        "id": 5
      },
      "properties": {
        "by": "incident"
      }
    }

    它将转换为以下 Cypher 查询

    MATCH (start:Foo {id: $from.ids.id}) WITH start
    MERGE (end:Bar {id: $to.ids.id}) WITH start, end
    MATCH (start)-[r:RELATED_TO {id: $ids.id}]->(end)
    SET r += $properties
  • 一个 MERGE 操作的示例;

    {
      "type": "relationship",
      "op": "merge",
      "rel_type": "RELATED_TO",
      "from": {
        "labels": ["Foo"],
        "ids": {
          "id": 0
        }
      },
      "to": {
        "labels": ["Bar"],
        "ids": {
          "id": 1
        },
        "op": "merge"
      },
      "properties": {
        "by": "incident"
      }
    }

    它将转换为以下 Cypher 查询

    MATCH (start:Foo {id: $from.ids.id}) WITH start
    MERGE (end:Bar {id: $to.ids.id}) WITH start, end
    MERGE (start)-[r:RELATED_TO]->(end)
    SET r += $properties
  • 一个 MERGE 操作的示例,其中包含 关系 ID

    {
      "type": "relationship",
      "op": "MERGE",
      "rel_type": "RELATED_TO",
      "from": {
        "labels": ["Foo"],
        "ids": {
          "id": 0
        }
      },
      "to": {
        "labels": ["Bar"],
        "ids": {
          "id": 1
        },
        "op": "merge"
      },
      "ids": {
        "id": 5
      },
      "properties": {
        "by": "incident"
      }
    }

    它将转换为以下 Cypher 查询

    MATCH (start:Foo {id: $from.ids.id}) WITH start
    MERGE (end:Bar {id: to.ids.id}) WITH start, end
    MERGE (start)-[r:RELATED_TO {id: $ids.id}]->(end)
    SET r += $properties
  • 一个 DELETE 操作的示例;

    {
      "type": "relationship",
      "op": "delete",
      "rel_type": "RELATED_TO",
      "from": {
        "labels": ["Foo"],
        "ids": {
          "id": 0
        }
      },
      "to": {
        "labels": ["Bar"],
        "ids": {
          "id": 1
        }
      }
    }

    它将转换为以下 Cypher 查询

    MATCH (start:Foo {id: $from.ids.id}) WITH start
    MATCH (end:Bar {id: $to.ids.id}) WITH start, end
    MATCH (start)-[r:RELATED_TO]->(end)
    DELETE r
  • 一个 DELETE 操作的示例,其中包含 关系 ID

    {
      "type": "relationship",
      "op": "DELETE",
      "rel_type": "RELATED_TO",
      "from": {
        "labels": ["Foo"],
        "ids": {
          "id": 0
        }
      },
      "to": {
        "labels": ["Bar"],
        "ids": {
          "id": 1
        }
      },
      "ids": {
        "id": 5
      }
    }

    它将转换为以下 Cypher 查询

    MATCH (start:Foo {id: $from.ids.id}) WITH start
    MATCH (end:Bar {id: $to.ids.id}) WITH start, end
    MATCH (start)-[r:RELATED_TO {id: $ids.id}]->(end)
    DELETE r