Confluent Cloud 快速入门

本页面包含有关第三方平台使用方法的说明,这些说明可能会发生超出我们控制的更改。如有疑问,请参阅第三方平台文档。

Neo4j 支持 Confluent Cloud,其 Neo4j Connector for Confluent 作为自定义连接器运行。Confluent 的自定义连接器提供了一种将 Confluent Cloud 扩展到其平台上现有完全托管连接器之外的方式。

先决条件

cc cluster
图 1. Confluent Cloud 中的运行中集群

上传自定义连接器

在创建连接器实例之前,我们首先需要将适用于 Kafka 的 Neo4j 连接器定义为自定义连接器。

  1. 选择要安装连接器的集群,打开“Connectors”部分,然后点击“Add plugin”。

  2. 点击“Add Plugin”,按照以下所示填写新自定义连接器的详细信息,然后接受条件并点击“Submit”。

    连接器插件名称

    适用于 Confluent 的 Neo4j 源连接器

    自定义插件描述

    适用于 Confluent 的 Neo4j 源连接器插件作为自定义连接器。

    连接器类

    org.neo4j.connectors.kafka.source.Neo4jConnector

    连接器类型

    连接器归档文件

    通过遵循分发说明获取最新的 Confluent Hub 组件归档包,并从您的本地计算机中选择已下载的 `neo4j-kafka-connector-5.1.12.zip` 文件。

    敏感属性

    为了保护连接器实例中的敏感配置属性,您应该至少将以下配置属性标记为敏感。

    neo4j.authentication.basic.password
    neo4j.authentication.kerberos.ticket
    neo4j.authentication.bearer.token
    neo4j.authentication.custom.credentials

它将上传归档文件并创建源插件。

目标

  1. 选择要安装连接器的集群,打开“Connectors”部分,然后点击“Add plugin”。

  2. 点击“Add Plugin”,按照以下所示填写新自定义连接器的详细信息,然后接受条件并点击“Submit”。

    连接器插件名称

    适用于 Confluent 的 Neo4j 目标连接器

    自定义插件描述

    适用于 Confluent 的 Neo4j 目标连接器插件作为自定义连接器。

    连接器类

    org.neo4j.connectors.kafka.sink.Neo4jConnector

    连接器类型

    目标

    连接器归档文件

    通过遵循分发说明获取最新的 Confluent Hub 组件归档包,并从您的本地计算机中选择已下载的 `neo4j-kafka-connector-5.1.12.zip` 文件。

    敏感属性

    为了保护连接器实例中的敏感配置属性,您应该至少将以下配置属性标记为敏感。

    neo4j.authentication.basic.password
    neo4j.authentication.kerberos.ticket
    neo4j.authentication.bearer.token
    neo4j.authentication.custom.credentials

它将上传归档文件并创建目标插件。

自定义连接器是用户创建的 Kafka Connect 插件、修改的开源连接器插件或第三方连接器插件,例如适用于 Confluent 的 Neo4j 连接器。在Confluent 文档中了解更多关于自定义连接器的信息,并阅读我们的博客文章,其中包含一个关于如何使用 Aura 进行设置的示例。

您可以通过遵循Neo4j 开发者博客上的说明,从 Confluent Cloud 创建连接器到 Neo4j 和Neo4j AuraDB

创建源实例

在上节中创建了自定义连接器后,我们现在可以开始配置我们的源实例。

  1. 在 Confluent Cloud 中,转到您的集群的“Connectors”部分,并搜索我们之前在上面创建的插件“适用于 Confluent 的 Neo4j 源连接器”。

  2. 点击连接器以开始配置我们的源连接器实例。

  3. 配置用于访问 Kafka 集群的 API 密钥,然后点击“继续”。

  4. 首先点击“自动配置 Schema Registry”,并根据您的偏好选择“JSON Schema”、“Avro”或“Protobuf”中的一个,然后点击“应用更改”。这将生成几个用于模式支持的配置选项。接下来,以单独的键值对形式或添加到现有 JSON 中配置连接器配置选项。为了快速入门,我们将配置我们的源实例,使其将与模式 (:TestSource) 匹配的节点上的更改事件消息发送到名为 createsupdatesdeletes 的主题,使用您首选的序列化格式。

    {
      "confluent.custom.schema.registry.auto": "true",
      "key.converter": "io.confluent.connect.avro.AvroConverter",
      "value.converter": "io.confluent.connect.avro.AvroConverter",
      "neo4j.uri": "neo4j+s://<redacted>.databases.neo4j.io",
      "neo4j.authentication.type": "BASIC",
      "neo4j.authentication.basic.username": "neo4j",
      "neo4j.authentication.basic.password": "<redacted>",
      "neo4j.source-strategy": "CDC",
      "neo4j.start-from": "NOW",
      "neo4j.cdc.poll-interval": "1s",
      "neo4j.cdc.topic.creates.patterns.0.pattern": "(:TestSource)",
      "neo4j.cdc.topic.creates.patterns.0.operation": "CREATE",
      "neo4j.cdc.topic.updates.patterns.0.pattern": "(:TestSource)",
      "neo4j.cdc.topic.updates.patterns.0.operation": "UPDATE",
      "neo4j.cdc.topic.deletes.patterns.0.pattern": "(:TestSource)",
      "neo4j.cdc.topic.deletes.patterns.0.operation": "DELETE"
    }
    {
      "confluent.custom.schema.registry.auto": "true",
      "key.converter": "io.confluent.connect.json.JsonSchemaConverter",
      "value.converter": "io.confluent.connect.json.JsonSchemaConverter",
      "neo4j.uri": "neo4j+s://<redacted>.databases.neo4j.io",
      "neo4j.authentication.type": "BASIC",
      "neo4j.authentication.basic.username": "neo4j",
      "neo4j.authentication.basic.password": "<redacted>",
      "neo4j.source-strategy": "CDC",
      "neo4j.start-from": "NOW",
      "neo4j.cdc.poll-interval": "1s",
      "neo4j.cdc.topic.creates.patterns.0.pattern": "(:TestSource)",
      "neo4j.cdc.topic.creates.patterns.0.operation": "CREATE",
      "neo4j.cdc.topic.updates.patterns.0.pattern": "(:TestSource)",
      "neo4j.cdc.topic.updates.patterns.0.operation": "UPDATE",
      "neo4j.cdc.topic.deletes.patterns.0.pattern": "(:TestSource)",
      "neo4j.cdc.topic.deletes.patterns.0.operation": "DELETE"
    }
    {
      "confluent.custom.schema.registry.auto": "true",
      "key.converter": "io.confluent.connect.protobuf.ProtobufConverter",
      "key.converter.optional.for.nullables": true,
      "value.converter": "io.confluent.connect.protobuf.ProtobufConverter",
      "value.converter.optional.for.nullables": true,
      "neo4j.uri": "neo4j+s://<redacted>.databases.neo4j.io",
      "neo4j.authentication.type": "BASIC",
      "neo4j.authentication.basic.username": "neo4j",
      "neo4j.authentication.basic.password": "<redacted>",
      "neo4j.source-strategy": "CDC",
      "neo4j.start-from": "NOW",
      "neo4j.cdc.poll-interval": "1s",
      "neo4j.cdc.topic.creates.patterns.0.pattern": "(:TestSource)",
      "neo4j.cdc.topic.creates.patterns.0.operation": "CREATE",
      "neo4j.cdc.topic.updates.patterns.0.pattern": "(:TestSource)",
      "neo4j.cdc.topic.updates.patterns.0.operation": "UPDATE",
      "neo4j.cdc.topic.deletes.patterns.0.pattern": "(:TestSource)",
      "neo4j.cdc.topic.deletes.patterns.0.operation": "DELETE"
    }

    验证所有配置选项是否正确,然后点击“继续”。

  5. 在下一个屏幕中,我们需要添加连接端点,以便我们的连接器可以访问 Neo4j 或 AuraDB。从 Neo4j 连接 URI 中提取主机名和端口,并将其添加为端点。请记住,Neo4j 连接的默认端口号是 7687。例如,对于连接 URI neo4j+s://<redacted>.databases.neo4j.io,我们应该输入 <redacted>.databases.neo4j.io:7687 作为端点。

  6. 接下来,选择您的连接器应运行的任务数量,然后点击“继续”。源连接器总是运行 1 个任务,因此默认值 1 就足够了。

  7. 最后,命名您的连接器实例,审查您的设置,然后点击“继续”。

  8. 源实例将在几分钟内配置完成并显示为“运行中”。

现在您有了正在运行的源实例,可以在 Neo4j 中创建以下节点:

CREATE (:TestSource {name: 'john', surname: 'doe'});
CREATE (:TestSource {name: 'mary', surname: 'doe'});
CREATE (:TestSource {name: 'jack', surname: 'small'});

这将导致新消息发布到名为 creates 的主题。

创建目标实例

在上节中创建了源实例后,我们现在可以开始配置我们的目标实例,以便能够处理源实例生成的消息。

  1. 在 Confluent Cloud 中,转到您的集群的“Connectors”部分,并搜索我们之前在上面创建的插件“适用于 Confluent 的 Neo4j 目标连接器”。

  2. 点击连接器以开始配置我们的目标连接器实例。

  3. 配置用于访问 Kafka 集群的 API 密钥,然后点击“继续”。

  4. 首先点击“自动配置 Schema Registry”,并根据您的偏好选择“JSON Schema”、“Avro”或“Protobuf”中的一个,然后点击“应用更改”。这将生成几个用于模式支持的配置选项。接下来,以单独的键值对形式或添加到现有 JSON 中配置连接器配置选项。为了快速入门,我们将配置我们的目标实例,使其对从名为 createsupdatesdeletes 的主题接收到的每条消息执行 Cypher 语句。

    {
      "confluent.custom.schema.registry.auto": "true",
      "key.converter": "io.confluent.connect.avro.AvroConverter",
      "value.converter": "io.confluent.connect.avro.AvroConverter",
      "topics": "creates,updates,deletes",
      "neo4j.uri": "neo4j+s://<redacted>.databases.neo4j.io",
      "neo4j.authentication.type": "BASIC",
      "neo4j.authentication.basic.username": "neo4j",
      "neo4j.authentication.basic.password": "<redacted>",
      "neo4j.cypher.topic.creates": "WITH __value.event.state.after AS state MERGE (p:Person {name: state.properties.name, surname: state.properties.surname}) MERGE (f:Family {name: state.properties.surname}) MERGE (p)-[:BELONGS_TO]->(f)",
      "neo4j.cypher.topic.updates": "WITH __value.event.state.before AS before, __value.event.state.after AS after MATCH (p:Person {name: before.properties.name, surname: before.properties.surname}) MATCH (fPre:Family {name: before.properties.surname}) OPTIONAL MATCH (p)-[b:BELONGS_TO]->(fPre) DELETE b WITH after, p SET p.name = after.properties.name, p.surname = after.properties.surname MERGE (f:Family {name: after.properties.surname}) MERGE (p)-[:BELONGS_TO]->(f)",
      "neo4j.cypher.topic.deletes": "WITH __value.event.state.before AS before MATCH (p:Person {name: before.properties.name, surname: before.properties.surname}) DETACH DELETE p",
      "neo4j.cypher.bind-timestamp-as": "",
      "neo4j.cypher.bind-header-as": "",
      "neo4j.cypher.bind-key-as": "",
      "neo4j.cypher.bind-value-as": "__value",
      "neo4j.cypher.bind-value-as-event": "false"
    }
    {
      "confluent.custom.schema.registry.auto": "true",
      "key.converter": "io.confluent.connect.json.JsonSchemaConverter",
      "value.converter": "io.confluent.connect.json.JsonSchemaConverter",
      "topics": "creates,updates,deletes",
      "neo4j.uri": "neo4j+s://<redacted>.databases.neo4j.io",
      "neo4j.authentication.type": "BASIC",
      "neo4j.authentication.basic.username": "neo4j",
      "neo4j.authentication.basic.password": "<redacted>",
      "neo4j.cypher.topic.creates": "WITH __value.event.state.after AS state MERGE (p:Person {name: state.properties.name, surname: state.properties.surname}) MERGE (f:Family {name: state.properties.surname}) MERGE (p)-[:BELONGS_TO]->(f)",
      "neo4j.cypher.topic.updates": "WITH __value.event.state.before AS before, __value.event.state.after AS after MATCH (p:Person {name: before.properties.name, surname: before.properties.surname}) MATCH (fPre:Family {name: before.properties.surname}) OPTIONAL MATCH (p)-[b:BELONGS_TO]->(fPre) DELETE b WITH after, p SET p.name = after.properties.name, p.surname = after.properties.surname MERGE (f:Family {name: after.properties.surname}) MERGE (p)-[:BELONGS_TO]->(f)",
      "neo4j.cypher.topic.deletes": "WITH __value.event.state.before AS before MATCH (p:Person {name: before.properties.name, surname: before.properties.surname}) DETACH DELETE p",
      "neo4j.cypher.bind-timestamp-as": "",
      "neo4j.cypher.bind-header-as": "",
      "neo4j.cypher.bind-key-as": "",
      "neo4j.cypher.bind-value-as": "__value",
      "neo4j.cypher.bind-value-as-event": "false"
    }
    {
      "confluent.custom.schema.registry.auto": "true",
      "key.converter": "io.confluent.connect.protobuf.ProtobufConverter",
      "key.converter.optional.for.nullables": true,
      "value.converter": "io.confluent.connect.protobuf.ProtobufConverter",
      "value.converter.optional.for.nullables": true,
      "topics": "creates,updates,deletes",
      "neo4j.uri": "neo4j+s://<redacted>.databases.neo4j.io",
      "neo4j.authentication.type": "BASIC",
      "neo4j.authentication.basic.username": "neo4j",
      "neo4j.authentication.basic.password": "<redacted>",
      "neo4j.cypher.topic.creates": "WITH __value.event.state.after AS state MERGE (p:Person {name: state.properties.name, surname: state.properties.surname}) MERGE (f:Family {name: state.properties.surname}) MERGE (p)-[:BELONGS_TO]->(f)",
      "neo4j.cypher.topic.updates": "WITH __value.event.state.before AS before, __value.event.state.after AS after MATCH (p:Person {name: before.properties.name, surname: before.properties.surname}) MATCH (fPre:Family {name: before.properties.surname}) OPTIONAL MATCH (p)-[b:BELONGS_TO]->(fPre) DELETE b WITH after, p SET p.name = after.properties.name, p.surname = after.properties.surname MERGE (f:Family {name: after.properties.surname}) MERGE (p)-[:BELONGS_TO]->(f)",
      "neo4j.cypher.topic.deletes": "WITH __value.event.state.before AS before MATCH (p:Person {name: before.properties.name, surname: before.properties.surname}) DETACH DELETE p",
      "neo4j.cypher.bind-timestamp-as": "",
      "neo4j.cypher.bind-header-as": "",
      "neo4j.cypher.bind-key-as": "",
      "neo4j.cypher.bind-value-as": "__value",
      "neo4j.cypher.bind-value-as-event": "false"
    }

    验证所有配置选项是否正确,然后点击“继续”。

  5. 在下一个屏幕中,我们需要添加连接端点,以便我们的连接器可以访问 Neo4j 或 AuraDB。从 Neo4j 连接 URI 中提取主机名和端口,并将其添加为端点。请记住,Neo4j 连接的默认端口号是 7687。例如,对于连接 URI neo4j+s://<redacted>.databases.neo4j.io,我们应该输入 <redacted>.databases.neo4j.io:7687 作为端点。

  6. 接下来,选择您的连接器应运行的任务数量,然后点击“继续”。

  7. 最后,命名您的连接器实例,审查您的设置,然后点击“继续”。

  8. 目标实例将在几分钟内配置完成并显示为“运行中”。

测试

现在您可以访问您的 Confluent Cloud 集群,并验证至少已按照连接器配置创建了 creates 主题。

在源连接器和目标连接器都运行的情况下,之前创建的 :TestSource 节点将导致源实例将消息发布到 creates 主题。这些消息将由目标实例消费,并在 Neo4j 中创建相应的 :Person:Family 节点。当您创建、更新和删除带有 TestSource 标签的节点时,updatesdeletes 主题也将被创建。

通过在 Neo4j Browser (http://localhost:7474/browser/) 中执行以下查询来检查是否如此:

MATCH (n:(Person | Family)) RETURN n

您现在可以通过执行更多语句来创建、更新或删除 Person 和 Family 节点,例如:

创建新人员
CREATE (:TestSource {name: 'Ann', surname: 'Bolin'});

验证是否创建了一个新的 Person 节点和一个新的 Family 节点并已链接在一起。

更新现有人员
MATCH (n:TestSource {name: 'mary', surname: 'doe'}) SET n.surname = 'smith';

验证现有 Person 节点是否已更新为姓氏 smith 并链接到新的 Family 节点。

删除现有人员
MATCH (n:TestSource {name: 'mary', surname: 'smith'}) DELETE n;

验证现有 Person 节点是否已删除。

总结

在本快速入门中,我们演示了如何将 AuraDB/Neo4j 数据库配置为既作为 Kafka 主题的消息源,又作为这些相同消息的目标,以在数据库中创建、更新或删除节点和关系。通常,我们的连接器用作从 Confluent 拉取其他数据源数据的目标,或者用作 Confluent 推送数据到其他数据库的源。

故障排除

  • 请确保您的数据库已启用 CDC。

  • 请确保您已按照上述说明正确设置了连接端点。

  • 检查连接器的日志。

请注意,Confluent Cloud 中的自定义连接器日志可能无法立即获得。在测试连接器实例的配置更改时,记住这一点可能会有所帮助。
© . All rights reserved.