Confluent Cloud 快速入门

此页面包含有关第三方平台用法的说明,这些用法可能受超出我们控制的更改的影响。如有疑问,请参阅第三方平台文档。

Neo4j 通过作为自定义连接器运行的 Confluent Neo4j 连接器支持 Confluent Cloud。Confluent 的自定义连接器提供了一种方法,可以将 Confluent Cloud 扩展到其平台上提供的完全托管连接器之外。

先决条件

cc cluster
图 1. Confluent Cloud 中的正在运行的集群

上传自定义连接器

在创建连接器实例之前,我们首先需要将 Neo4j Kafka 连接器定义为自定义连接器。

  1. 选择要将连接器安装到的集群,打开 连接器 部分并点击 添加插件

  2. 点击 添加插件 并填写如下所示的新自定义连接器的详细信息,然后接受条件并点击 提交

    连接器插件名称

    Confluent 源 Neo4j 连接器

    自定义插件描述

    作为自定义连接器的 Confluent 源 Neo4j 连接器插件。

    连接器类

    org.neo4j.connectors.kafka.source.Neo4jConnector

    连接器类型

    连接器归档

    按照 分发说明 获取最新的 Confluent Hub 组件归档包,并从本地计算机中选择下载的 neo4j-kafka-connector-5.1.5.zip 文件。

    敏感属性

    为了在连接器实例中保护敏感配置属性,您应该将至少以下配置属性标记为敏感。

    neo4j.authentication.basic.password
    neo4j.authentication.kerberos.ticket
    neo4j.authentication.bearer.token
    neo4j.authentication.custom.credentials

它将上传归档文件并创建源插件。

接收器

  1. 选择要将连接器安装到的集群,打开 连接器 部分并点击 添加插件

  2. 点击 添加插件 并填写如下所示的新自定义连接器的详细信息,然后接受条件并点击 提交

    连接器插件名称

    Confluent 接收器 Neo4j 连接器

    自定义插件描述

    作为自定义连接器的 Confluent 接收器 Neo4j 连接器插件。

    连接器类

    org.neo4j.connectors.kafka.sink.Neo4jConnector

    连接器类型

    接收器

    连接器归档

    按照 分发说明 获取最新的 Confluent Hub 组件归档包,并从本地计算机中选择下载的 neo4j-kafka-connector-5.1.5.zip 文件。

    敏感属性

    为了在连接器实例中保护敏感配置属性,您应该将至少以下配置属性标记为敏感。

    neo4j.authentication.basic.password
    neo4j.authentication.kerberos.ticket
    neo4j.authentication.bearer.token
    neo4j.authentication.custom.credentials

它将上传归档文件并创建接收器插件。

自定义连接器是由用户创建的 Kafka Connect 插件、修改后的开源连接器插件或第三方连接器插件(如 Confluent Neo4j 连接器)。在 Confluent 文档 中了解有关自定义连接器的更多信息,并阅读我们的 博文,其中包含有关如何使用 Aura 设置此内容的示例。

您可以按照 Neo4j 开发者博客 上的说明,从 Confluent Cloud 创建到 Neo4j 和 Neo4j AuraDB 的连接器。

创建源实例

在上一节中创建了自定义连接器后,我们现在可以开始配置源实例。

  1. 在 Confluent Cloud 中,转到集群的连接器部分,并搜索我们在 上面 创建的插件 Confluent 源 Neo4j 连接器

  2. 点击连接器以开始配置我们的源连接器实例。

  3. 配置用于访问 Kafka 集群的 API 密钥,然后点击继续。

  4. 首先点击 自动配置模式注册表 并根据您的偏好选择 JSON 模式AvroProtobuf 中的任何一个,然后点击 应用更改。这将生成一些模式支持的配置选项。接下来,将连接器配置选项配置为单独的键值对或添加到现有的 JSON 中。为了快速入门,我们将配置源实例,以便它将匹配模式 (:TestSource) 的节点上的更改事件消息发送到名为 createsupdatesdeletes 的主题,使用您首选的序列化格式。

    {
      "confluent.custom.schema.registry.auto": "true",
      "key.converter": "io.confluent.connect.avro.AvroConverter",
      "value.converter": "io.confluent.connect.avro.AvroConverter",
      "neo4j.uri": "neo4j+s://<redacted>.databases.neo4j.io",
      "neo4j.authentication.type": "BASIC",
      "neo4j.authentication.basic.username": "neo4j",
      "neo4j.authentication.basic.password": "<redacted>",
      "neo4j.source-strategy": "CDC",
      "neo4j.start-from": "NOW",
      "neo4j.cdc.poll-interval": "1s",
      "neo4j.cdc.topic.creates.patterns.0.pattern": "(:TestSource)",
      "neo4j.cdc.topic.creates.patterns.0.operation": "CREATE",
      "neo4j.cdc.topic.updates.patterns.0.pattern": "(:TestSource)",
      "neo4j.cdc.topic.updates.patterns.0.operation": "UPDATE",
      "neo4j.cdc.topic.deletes.patterns.0.pattern": "(:TestSource)",
      "neo4j.cdc.topic.deletes.patterns.0.operation": "DELETE"
    }
    {
      "confluent.custom.schema.registry.auto": "true",
      "key.converter": "io.confluent.connect.json.JsonSchemaConverter",
      "value.converter": "io.confluent.connect.json.JsonSchemaConverter",
      "neo4j.uri": "neo4j+s://<redacted>.databases.neo4j.io",
      "neo4j.authentication.type": "BASIC",
      "neo4j.authentication.basic.username": "neo4j",
      "neo4j.authentication.basic.password": "<redacted>",
      "neo4j.source-strategy": "CDC",
      "neo4j.start-from": "NOW",
      "neo4j.cdc.poll-interval": "1s",
      "neo4j.cdc.topic.creates.patterns.0.pattern": "(:TestSource)",
      "neo4j.cdc.topic.creates.patterns.0.operation": "CREATE",
      "neo4j.cdc.topic.updates.patterns.0.pattern": "(:TestSource)",
      "neo4j.cdc.topic.updates.patterns.0.operation": "UPDATE",
      "neo4j.cdc.topic.deletes.patterns.0.pattern": "(:TestSource)",
      "neo4j.cdc.topic.deletes.patterns.0.operation": "DELETE"
    }
    {
      "confluent.custom.schema.registry.auto": "true",
      "key.converter": "io.confluent.connect.protobuf.ProtobufConverter",
      "key.converter.optional.for.nullables": true,
      "value.converter": "io.confluent.connect.protobuf.ProtobufConverter",
      "value.converter.optional.for.nullables": true,
      "neo4j.uri": "neo4j+s://<redacted>.databases.neo4j.io",
      "neo4j.authentication.type": "BASIC",
      "neo4j.authentication.basic.username": "neo4j",
      "neo4j.authentication.basic.password": "<redacted>",
      "neo4j.source-strategy": "CDC",
      "neo4j.start-from": "NOW",
      "neo4j.cdc.poll-interval": "1s",
      "neo4j.cdc.topic.creates.patterns.0.pattern": "(:TestSource)",
      "neo4j.cdc.topic.creates.patterns.0.operation": "CREATE",
      "neo4j.cdc.topic.updates.patterns.0.pattern": "(:TestSource)",
      "neo4j.cdc.topic.updates.patterns.0.operation": "UPDATE",
      "neo4j.cdc.topic.deletes.patterns.0.pattern": "(:TestSource)",
      "neo4j.cdc.topic.deletes.patterns.0.operation": "DELETE"
    }

    验证所有配置选项是否正确,然后点击 继续

  5. 在下一个屏幕中,我们需要添加连接端点,以便连接器可以访问 Neo4j 或 AuraDB。从 Neo4j 连接 URI 中提取主机名和端口,并将其添加为端点。请记住,Neo4j 连接的默认端口号为 7687。例如,对于连接 URI neo4j+s://<redacted>.databases.neo4j.io,我们应输入 <redacted>.databases.neo4j.io:7687 作为端点。

  6. 接下来,选择连接器应运行多少个任务,然后点击 继续。源连接器始终使用 1 个任务运行,因此 1 的默认值就足够了。

  7. 最后,命名连接器实例,查看您的设置并点击 继续

  8. 源实例将在几分钟内完成预配,并在几分钟后显示为 正在运行

现在您拥有了一个正在运行的源实例,您可以在 Neo4j 中创建以下节点

CREATE (:TestSource {name: 'john', surname: 'doe'});
CREATE (:TestSource {name: 'mary', surname: 'doe'});
CREATE (:TestSource {name: 'jack', surname: 'small'});

这将导致将新消息发布到名为 creates 的主题。

创建接收器实例

在上一节中创建了源实例后,我们现在可以开始配置接收器实例,以便我们可以对源实例生成的消息采取行动。

  1. 在 Confluent Cloud 中,转到集群的连接器部分,并搜索我们在 上面 创建的插件 Confluent 接收器 Neo4j 连接器

  2. 点击连接器以开始配置我们的接收器连接器实例。

  3. 配置用于访问 Kafka 集群的 API 密钥,然后点击继续。

  4. 首先点击 自动配置模式注册表 并根据您的偏好选择 JSON 模式AvroProtobuf 中的任何一个,然后点击 应用更改。这将生成一些模式支持的配置选项。接下来,将连接器配置选项配置为单独的键值对或添加到现有的 JSON 中。为了快速入门,我们将配置接收器实例,以便它将为从名为 createsupdatesdeletes 的主题接收到的每条消息执行 Cypher 语句。

    {
      "confluent.custom.schema.registry.auto": "true",
      "key.converter": "io.confluent.connect.avro.AvroConverter",
      "value.converter": "io.confluent.connect.avro.AvroConverter",
      "topics": "creates,updates,deletes",
      "neo4j.uri": "neo4j+s://<redacted>.databases.neo4j.io",
      "neo4j.authentication.type": "BASIC",
      "neo4j.authentication.basic.username": "neo4j",
      "neo4j.authentication.basic.password": "<redacted>",
      "neo4j.cypher.topic.creates": "WITH __value.event.state.after AS state MERGE (p:Person {name: state.properties.name, surname: state.properties.surname}) MERGE (f:Family {name: state.properties.surname}) MERGE (p)-[:BELONGS_TO]->(f)",
      "neo4j.cypher.topic.updates": "WITH __value.event.state.before AS before, __value.event.state.after AS after MATCH (p:Person {name: before.properties.name, surname: before.properties.surname}) MATCH (fPre:Family {name: before.properties.surname}) OPTIONAL MATCH (p)-[b:BELONGS_TO]->(fPre) DELETE b WITH after, p SET p.name = after.properties.name, p.surname = after.properties.surname MERGE (f:Family {name: after.properties.surname}) MERGE (p)-[:BELONGS_TO]->(f)",
      "neo4j.cypher.topic.deletes": "WITH __value.event.state.before AS before MATCH (p:Person {name: before.properties.name, surname: before.properties.surname}) DETACH DELETE p",
      "neo4j.cypher.bind-timestamp-as": "",
      "neo4j.cypher.bind-header-as": "",
      "neo4j.cypher.bind-key-as": "",
      "neo4j.cypher.bind-value-as": "__value",
      "neo4j.cypher.bind-value-as-event": "false"
    }
    {
      "confluent.custom.schema.registry.auto": "true",
      "key.converter": "io.confluent.connect.json.JsonSchemaConverter",
      "value.converter": "io.confluent.connect.json.JsonSchemaConverter",
      "topics": "creates,updates,deletes",
      "neo4j.uri": "neo4j+s://<redacted>.databases.neo4j.io",
      "neo4j.authentication.type": "BASIC",
      "neo4j.authentication.basic.username": "neo4j",
      "neo4j.authentication.basic.password": "<redacted>",
      "neo4j.cypher.topic.creates": "WITH __value.event.state.after AS state MERGE (p:Person {name: state.properties.name, surname: state.properties.surname}) MERGE (f:Family {name: state.properties.surname}) MERGE (p)-[:BELONGS_TO]->(f)",
      "neo4j.cypher.topic.updates": "WITH __value.event.state.before AS before, __value.event.state.after AS after MATCH (p:Person {name: before.properties.name, surname: before.properties.surname}) MATCH (fPre:Family {name: before.properties.surname}) OPTIONAL MATCH (p)-[b:BELONGS_TO]->(fPre) DELETE b WITH after, p SET p.name = after.properties.name, p.surname = after.properties.surname MERGE (f:Family {name: after.properties.surname}) MERGE (p)-[:BELONGS_TO]->(f)",
      "neo4j.cypher.topic.deletes": "WITH __value.event.state.before AS before MATCH (p:Person {name: before.properties.name, surname: before.properties.surname}) DETACH DELETE p",
      "neo4j.cypher.bind-timestamp-as": "",
      "neo4j.cypher.bind-header-as": "",
      "neo4j.cypher.bind-key-as": "",
      "neo4j.cypher.bind-value-as": "__value",
      "neo4j.cypher.bind-value-as-event": "false"
    }
    {
      "confluent.custom.schema.registry.auto": "true",
      "key.converter": "io.confluent.connect.protobuf.ProtobufConverter",
      "key.converter.optional.for.nullables": true,
      "value.converter": "io.confluent.connect.protobuf.ProtobufConverter",
      "value.converter.optional.for.nullables": true,
      "topics": "creates,updates,deletes",
      "neo4j.uri": "neo4j+s://<redacted>.databases.neo4j.io",
      "neo4j.authentication.type": "BASIC",
      "neo4j.authentication.basic.username": "neo4j",
      "neo4j.authentication.basic.password": "<redacted>",
      "neo4j.cypher.topic.creates": "WITH __value.event.state.after AS state MERGE (p:Person {name: state.properties.name, surname: state.properties.surname}) MERGE (f:Family {name: state.properties.surname}) MERGE (p)-[:BELONGS_TO]->(f)",
      "neo4j.cypher.topic.updates": "WITH __value.event.state.before AS before, __value.event.state.after AS after MATCH (p:Person {name: before.properties.name, surname: before.properties.surname}) MATCH (fPre:Family {name: before.properties.surname}) OPTIONAL MATCH (p)-[b:BELONGS_TO]->(fPre) DELETE b WITH after, p SET p.name = after.properties.name, p.surname = after.properties.surname MERGE (f:Family {name: after.properties.surname}) MERGE (p)-[:BELONGS_TO]->(f)",
      "neo4j.cypher.topic.deletes": "WITH __value.event.state.before AS before MATCH (p:Person {name: before.properties.name, surname: before.properties.surname}) DETACH DELETE p",
      "neo4j.cypher.bind-timestamp-as": "",
      "neo4j.cypher.bind-header-as": "",
      "neo4j.cypher.bind-key-as": "",
      "neo4j.cypher.bind-value-as": "__value",
      "neo4j.cypher.bind-value-as-event": "false"
    }

    验证所有配置选项是否正确,然后点击 继续

  5. 在下一个屏幕中,我们需要添加连接端点,以便连接器可以访问 Neo4j 或 AuraDB。从 Neo4j 连接 URI 中提取主机名和端口,并将其添加为端点。请记住,Neo4j 连接的默认端口号为 7687。例如,对于连接 URI neo4j+s://<redacted>.databases.neo4j.io,我们应输入 <redacted>.databases.neo4j.io:7687 作为端点。

  6. 接下来,选择连接器应运行多少个任务,然后点击 继续

  7. 最后,命名连接器实例,查看您的设置并点击 继续

  8. 接收器实例将在几分钟内完成预配,并在几分钟后显示为 正在运行

测试

现在您可以访问您的 Confluent Cloud 集群,并验证是否至少创建了连接器配置中指定的 creates 主题。

在源和接收器连接器都运行的情况下,先前创建的 :TestSource 节点将导致源实例将消息发布到 creates 主题。然后,接收器实例将使用这些消息,并在 Neo4j 中创建相应的 :Person:Family 节点。当您创建、更新和删除标记为 TestSource 的节点时,updatesdeletes 主题也将被创建。

通过在 http://localhost:7474/browser/ 上的 Neo4j 浏览器中执行以下查询来检查情况是否如此

MATCH (n:(Person | Family)) RETURN n

您现在可以通过执行更多类似以下的语句来创建、更新或删除 Person 和 Family 节点

创建一个新的人
CREATE (:TestSource {name: 'Ann', surname: 'Bolin'});

验证是否创建了一个新的 Person 节点和一个新的 Family 节点并将其链接在一起。

更新现有的人
MATCH (n:TestSource {name: 'mary', surname: 'doe'}) SET n.surname = 'smith';

验证现有的 Person 节点是否已更新为姓氏为 smith 并链接到一个新的 Family 节点。

删除现有的人
MATCH (n:TestSource {name: 'mary', surname: 'smith'}) DELETE n;

验证现有的 Person 节点是否已删除。

总结

在本快速入门中,我们展示了如何配置 AuraDB/Neo4j 数据库以同时充当 Kafka 主题的消息源和这些相同消息的接收器,以在数据库中创建、更新或删除节点和关系。通常,我们的连接器用作接收器(当通过 Confluent 从其他数据源提取数据时)或用作源(当 Confluent 将数据推送到其他数据库时)。

故障排除

  • 确保您已在数据库上启用了 CDC。

  • 确保您已正确设置了如上所述的连接端点。

  • 检查连接器的日志。

请注意,Confluent Cloud 中的自定义连接器日志可能不会立即可用。在测试连接器实例上的配置更改时,请记住这一点。