CUD 文件格式策略

CUD 文件格式是一种 JSON 文件,表示在节点或关系上执行的操作(建、新、除)。

配置

您可以通过以下方式配置主题

"neo4j.cud.topics": "<COMMA_SEPARATED_LIST_OF_TOPICS>"

例如,如果您有两个主题,topic.1topic.2,它们发布为 CUD 格式的消息,您可以按如下方式配置目标实例。

"neo4j.cud.topics": "topic.1,topic.2"

创建目标实例

根据上述示例,您可以使用以下配置之一。选择一个消息序列化格式示例,并将其保存为名为 sink.cud.neo4j.json 的文件到本地目录中。

{
  "name": "Neo4jSinkConnectorAVRO",
  "config": {
    "topics": "topic.1,topic.2",
    "connector.class": "org.neo4j.connectors.kafka.sink.Neo4jConnector",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "neo4j.uri": "neo4j://neo4j:7687",
    "neo4j.authentication.type": "BASIC",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "password",
    "neo4j.cud.topics": "topic.1,topic.2"
  }
}
{
  "name": "Neo4jSinkConnectorJSONSchema",
  "config": {
    "topics": "topic.1,topic.2",
    "connector.class": "org.neo4j.connectors.kafka.sink.Neo4jConnector",
    "key.converter": "io.confluent.connect.json.JsonSchemaConverter",
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter": "io.confluent.connect.json.JsonSchemaConverter",
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "neo4j.uri": "neo4j://neo4j:7687",
    "neo4j.authentication.type": "BASIC",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "password",
    "neo4j.cud.topics": "topic.1,topic.2"
  }
}
{
  "name": "Neo4jSinkConnectorProtobuf",
  "config": {
    "topics": "topic.1,topic.2",
    "connector.class": "org.neo4j.connectors.kafka.sink.Neo4jConnector",
    "key.converter": "io.confluent.connect.protobuf.ProtobufConverter",
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "key.converter.optional.for.nullables": true,
    "value.converter": "io.confluent.connect.protobuf.ProtobufConverter",
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter.optional.for.nullables": true,
    "neo4j.uri": "neo4j://neo4j:7687",
    "neo4j.authentication.type": "BASIC",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "password",
    "neo4j.cud.topics": "topic.1,topic.2"
  }
}

使用此 REST 调用将配置加载到 Kafka Connect 中

curl -X POST http://localhost:8083/connectors \
  -H 'Content-Type:application/json' \
  -H 'Accept:application/json' \
  -d @sink.cud.neo4j.json

现在,您可以在 http://localhost:9021/clusters 下访问您的 Confluent Control Center 实例。在 connect-default 下的 Connect 选项卡中验证配置的连接器实例是否正在运行。

CUD 格式规范

我们有两种格式

  • 节点

  • 关系

节点

节点格式定义了一组字段,用于描述节点实体的操作(createupdatemergedelete)、标签和属性。

表 1. 节点格式字段
字段 必填 描述

op

必填

操作类型。可以是 createmergeupdatedelete 之一。

delete 操作仅适用于单个节点。它不用于从 JSON 运行通用 Cypher 查询。

properties

当操作为 delete 时为可选

附加到节点的属性。

ids

当操作为 create 时为可选

包含将用于查找实体的主/唯一键属性。如果您使用 _elementId(或 _id)作为属性名,它将使用 Neo4j 的内部元素 ID 进行节点查找。

如果您在 merge 操作中使用 _elementId(或 _id),它将表现为简单的更新,即如果给定元素 ID 的节点不存在,则不会创建它。

labels

可选

附加到节点的标签。

尽管 Neo4j 允许创建不带标签的节点,但从性能角度来看不建议这样做。

type

必填

实体类型:node

detach

可选

当操作为 delete 时,您可以指定是否执行“分离删除”

如果未提供值,默认值为 false

示例

  • CREATE 操作示例;

    {
      "type": "node",
      "op": "create",
      "labels": ["Foo", "Bar"],
      "properties": {
        "id": 1,
        "foo": "foo-value"
      }
    }

    这将转换为以下 Cypher 查询

    CREATE (n:Foo:Bar) SET n = $properties
  • UPDATE 操作示例;

    {
      "type": "node",
      "op": "update",
      "labels": ["Foo", "Bar"],
      "ids": {
        "id": 0
      },
      "properties": {
        "id": 1,
        "foo": "foo-value"
      }
    }

    这将转换为以下 Cypher 查询

    MATCH (n:Foo:Bar {id: $ids.id}) SET n += $properties
  • MERGE 操作示例;

    {
      "type": "node",
      "op": "merge",
      "labels": ["Foo", "Bar"],
      "ids": {
        "id": 0
      },
      "properties": {
        "id": 1,
        "foo": "foo-value"
      }
    }

    这将转换为以下 Cypher 查询

    MERGE (n:Foo:Bar {id: $ids.id}) SET n += $properties
  • DELETE 操作示例;

    {
      "type": "NODE",
      "op": "delete",
      "labels": ["Foo", "Bar"],
      "ids": {
        "id": 0
      }
    }

    这将转换为以下 Cypher 查询

    MATCH (n:Foo:Bar {id: $ids.id}) DELETE n
  • 带 detach trueDELETE 操作示例;

    {
      "type": "NODE",
      "op": "delete",
      "labels": ["Foo", "Bar"],
      "ids": {
        "id": 0
      },
      "detach": true
    }

    这将转换为以下 Cypher 查询

    MATCH (n:Foo:Bar {id: $ids.id}) DETACH DELETE n

关系

关系格式定义了一组字段,用于描述操作(createupdatemergedelete)、关系类型、源节点和目标节点引用以及属性。

表 2. 关系格式字段
字段 必填 描述

op

必填

操作类型。可以是 createmergeupdatedelete 之一。

properties

当操作为 delete 时为可选

附加到关系的属性。

rel_type

必填

关系类型。

ids

可选

包含将用于查找关系的主/唯一键属性。

from

必填

包含有关关系源节点的信息。op 字段只能是 mergematch,默认为 match。有关 idslabels 字段的描述,请参见上方描述的节点字段。

如果在 ids 中使用 _elementId(或 _id)字段引用,则可以省略 labels 字段。

to

必填

包含有关关系目标节点的信息。op 字段只能是 mergematch,默认为 match。有关 idslabels 字段的描述,请参见上方描述的节点字段。

如果在 ids 中使用 _elementId(或 _id)字段引用,则可以省略 labels 字段。

type

必填

实体类型:relationship

示例

  • CREATE 操作示例;

    {
      "type": "relationship",
      "op": "create",
      "rel_type": "RELATED_TO",
      "from": {
        "labels": ["Foo"],
        "ids": {
          "id": 0
        }
      },
      "to": {
        "labels": ["Bar"],
        "ids": {
          "id": 1
        }
      },
      "properties": {
        "by": "incident"
      }
    }

    这将转换为以下 Cypher 查询

    MATCH (start:Foo {id: $from.ids.id}) WITH start
    MATCH (end:Bar {id: $to.ids.id}) WITH start, end
    CREATE (start)-[r:RELATED_TO]->(end)
    SET r = $properties
  • merge 源节点的 CREATE 操作示例;

    {
      "type": "relationship",
      "op": "create",
      "rel_type": "RELATED_TO",
      "from": {
        "labels": ["Foo"],
        "ids": {
          "id": 0
        },
        "op": "merge"
      },
      "to": {
        "labels": ["Bar"],
        "ids": {
          "id": 1
        },
        "op": "match"
      },
      "properties": {
        "by": "incident"
      }
    }

    这将转换为以下 Cypher 查询

    MERGE (start:Foo {id: $from.ids.id}) WITH start
    MATCH (end:Bar {id: $to.ids.id}) WITH start, end
    CREATE (start)-[r:RELATED_TO]->(end)
    SET r = $properties
  • UPDATE 操作示例;

    {
      "type": "relationship",
      "op": "update",
      "rel_type": "RELATED_TO",
      "from": {
        "labels": ["Foo"],
        "ids": {
          "id": 0
        }
      },
      "to": {
        "labels": ["Bar"],
        "ids": {
          "id": 1
        },
        "op": "merge"
      },
      "properties": {
        "by": "incident"
      }
    }

    这将转换为以下 Cypher 查询

    MATCH (start:Foo {id: $from.ids.id}) WITH start
    MERGE (end:Bar {id: $to.ids.id}) WITH start, end
    MATCH (start)-[r:RELATED_TO]->(end)
    SET r += $properties
  • relationship idsUPDATE 操作示例;

    {
      "type": "relationship",
      "op": "update",
      "rel_type": "RELATED_TO",
      "from": {
        "labels": ["Foo"],
        "ids": {
          "id": 0
        }
      },
      "to": {
        "labels": ["Bar"],
        "ids": {
          "id": 1
        },
        "op": "merge"
      },
      "ids": {
        "id": 5
      },
      "properties": {
        "by": "incident"
      }
    }

    这将转换为以下 Cypher 查询

    MATCH (start:Foo {id: $from.ids.id}) WITH start
    MERGE (end:Bar {id: $to.ids.id}) WITH start, end
    MATCH (start)-[r:RELATED_TO {id: $ids.id}]->(end)
    SET r += $properties
  • MERGE 操作示例;

    {
      "type": "relationship",
      "op": "merge",
      "rel_type": "RELATED_TO",
      "from": {
        "labels": ["Foo"],
        "ids": {
          "id": 0
        }
      },
      "to": {
        "labels": ["Bar"],
        "ids": {
          "id": 1
        },
        "op": "merge"
      },
      "properties": {
        "by": "incident"
      }
    }

    这将转换为以下 Cypher 查询

    MATCH (start:Foo {id: $from.ids.id}) WITH start
    MERGE (end:Bar {id: $to.ids.id}) WITH start, end
    MERGE (start)-[r:RELATED_TO]->(end)
    SET r += $properties
  • relationship idsMERGE 操作示例;

    {
      "type": "relationship",
      "op": "MERGE",
      "rel_type": "RELATED_TO",
      "from": {
        "labels": ["Foo"],
        "ids": {
          "id": 0
        }
      },
      "to": {
        "labels": ["Bar"],
        "ids": {
          "id": 1
        },
        "op": "merge"
      },
      "ids": {
        "id": 5
      },
      "properties": {
        "by": "incident"
      }
    }

    这将转换为以下 Cypher 查询

    MATCH (start:Foo {id: $from.ids.id}) WITH start
    MERGE (end:Bar {id: to.ids.id}) WITH start, end
    MERGE (start)-[r:RELATED_TO {id: $ids.id}]->(end)
    SET r += $properties
  • DELETE 操作示例;

    {
      "type": "relationship",
      "op": "delete",
      "rel_type": "RELATED_TO",
      "from": {
        "labels": ["Foo"],
        "ids": {
          "id": 0
        }
      },
      "to": {
        "labels": ["Bar"],
        "ids": {
          "id": 1
        }
      }
    }

    这将转换为以下 Cypher 查询

    MATCH (start:Foo {id: $from.ids.id}) WITH start
    MATCH (end:Bar {id: $to.ids.id}) WITH start, end
    MATCH (start)-[r:RELATED_TO]->(end)
    DELETE r
  • relationship idsDELETE 操作示例;

    {
      "type": "relationship",
      "op": "DELETE",
      "rel_type": "RELATED_TO",
      "from": {
        "labels": ["Foo"],
        "ids": {
          "id": 0
        }
      },
      "to": {
        "labels": ["Bar"],
        "ids": {
          "id": 1
        }
      },
      "ids": {
        "id": 5
      }
    }

    这将转换为以下 Cypher 查询

    MATCH (start:Foo {id: $from.ids.id}) WITH start
    MATCH (end:Bar {id: $to.ids.id}) WITH start, end
    MATCH (start)-[r:RELATED_TO {id: $ids.id}]->(end)
    DELETE r
© . All rights reserved.