Confluent Cloud 快速入门
此页面包含有关第三方平台用法的说明,这些用法可能受超出我们控制的更改的影响。如有疑问,请参阅第三方平台文档。 |
Neo4j 通过作为自定义连接器运行的 Confluent Neo4j 连接器支持 Confluent Cloud。Confluent 的自定义连接器提供了一种方法,可以将 Confluent Cloud 扩展到其平台上提供的完全托管连接器之外。
先决条件
-
一个 AuraDB Enterprise 或一个 Neo4j Enterprise 本地部署,已启用 CDC,可以通过公共互联网访问。请按照 在 Neo4j Aura 上启用 CDC 或 在 Neo4j 数据库管理系统上启用 CDC 获取说明。
-
必须在 Confluent Cloud 中有一个环境和一个正在运行的集群,类似于 图 1,“Confluent Cloud 中的正在运行的集群” 中所示。

上传自定义连接器
在创建连接器实例之前,我们首先需要将 Neo4j Kafka 连接器定义为自定义连接器。
源
-
选择要将连接器安装到的集群,打开
连接器
部分并点击添加插件
。 -
点击
添加插件
并填写如下所示的新自定义连接器的详细信息,然后接受条件并点击提交
。- 连接器插件名称
-
Confluent 源 Neo4j 连接器
- 自定义插件描述
-
作为自定义连接器的 Confluent 源 Neo4j 连接器插件。
- 连接器类
-
org.neo4j.connectors.kafka.source.Neo4jConnector
- 连接器类型
-
源
- 连接器归档
-
按照 分发说明 获取最新的 Confluent Hub 组件归档包,并从本地计算机中选择下载的
neo4j-kafka-connector-5.1.5.zip
文件。 - 敏感属性
-
为了在连接器实例中保护敏感配置属性,您应该将至少以下配置属性标记为敏感。
neo4j.authentication.basic.password neo4j.authentication.kerberos.ticket neo4j.authentication.bearer.token neo4j.authentication.custom.credentials
它将上传归档文件并创建源插件。
接收器
-
选择要将连接器安装到的集群,打开
连接器
部分并点击添加插件
。 -
点击
添加插件
并填写如下所示的新自定义连接器的详细信息,然后接受条件并点击提交
。- 连接器插件名称
-
Confluent 接收器 Neo4j 连接器
- 自定义插件描述
-
作为自定义连接器的 Confluent 接收器 Neo4j 连接器插件。
- 连接器类
-
org.neo4j.connectors.kafka.sink.Neo4jConnector
- 连接器类型
-
接收器
- 连接器归档
-
按照 分发说明 获取最新的 Confluent Hub 组件归档包,并从本地计算机中选择下载的
neo4j-kafka-connector-5.1.5.zip
文件。 - 敏感属性
-
为了在连接器实例中保护敏感配置属性,您应该将至少以下配置属性标记为敏感。
neo4j.authentication.basic.password neo4j.authentication.kerberos.ticket neo4j.authentication.bearer.token neo4j.authentication.custom.credentials
它将上传归档文件并创建接收器插件。
自定义连接器是由用户创建的 Kafka Connect 插件、修改后的开源连接器插件或第三方连接器插件(如 Confluent Neo4j 连接器)。在 Confluent 文档 中了解有关自定义连接器的更多信息,并阅读我们的 博文,其中包含有关如何使用 Aura 设置此内容的示例。
您可以按照 Neo4j 开发者博客 上的说明,从 Confluent Cloud 创建到 Neo4j 和 Neo4j AuraDB 的连接器。
创建源实例
在上一节中创建了自定义连接器后,我们现在可以开始配置源实例。
-
在 Confluent Cloud 中,转到集群的连接器部分,并搜索我们在 上面 创建的插件
Confluent 源 Neo4j 连接器
。 -
点击连接器以开始配置我们的源连接器实例。
-
配置用于访问 Kafka 集群的 API 密钥,然后点击继续。
-
首先点击
自动配置模式注册表
并根据您的偏好选择JSON 模式
、Avro
或Protobuf
中的任何一个,然后点击应用更改
。这将生成一些模式支持的配置选项。接下来,将连接器配置选项配置为单独的键值对或添加到现有的 JSON 中。为了快速入门,我们将配置源实例,以便它将匹配模式(:TestSource)
的节点上的更改事件消息发送到名为creates
、updates
和deletes
的主题,使用您首选的序列化格式。验证所有配置选项是否正确,然后点击
继续
。 -
在下一个屏幕中,我们需要添加连接端点,以便连接器可以访问 Neo4j 或 AuraDB。从 Neo4j 连接 URI 中提取主机名和端口,并将其添加为端点。请记住,Neo4j 连接的默认端口号为
7687
。例如,对于连接 URIneo4j+s://<redacted>.databases.neo4j.io
,我们应输入<redacted>.databases.neo4j.io:7687
作为端点。 -
接下来,选择连接器应运行多少个任务,然后点击
继续
。源连接器始终使用 1 个任务运行,因此1
的默认值就足够了。 -
最后,命名连接器实例,查看您的设置并点击
继续
。 -
源实例将在几分钟内完成预配,并在几分钟后显示为
正在运行
。
现在您拥有了一个正在运行的源实例,您可以在 Neo4j 中创建以下节点
CREATE (:TestSource {name: 'john', surname: 'doe'});
CREATE (:TestSource {name: 'mary', surname: 'doe'});
CREATE (:TestSource {name: 'jack', surname: 'small'});
这将导致将新消息发布到名为 creates
的主题。
创建接收器实例
在上一节中创建了源实例后,我们现在可以开始配置接收器实例,以便我们可以对源实例生成的消息采取行动。
-
在 Confluent Cloud 中,转到集群的连接器部分,并搜索我们在 上面 创建的插件
Confluent 接收器 Neo4j 连接器
。 -
点击连接器以开始配置我们的接收器连接器实例。
-
配置用于访问 Kafka 集群的 API 密钥,然后点击继续。
-
首先点击
自动配置模式注册表
并根据您的偏好选择JSON 模式
、Avro
或Protobuf
中的任何一个,然后点击应用更改
。这将生成一些模式支持的配置选项。接下来,将连接器配置选项配置为单独的键值对或添加到现有的 JSON 中。为了快速入门,我们将配置接收器实例,以便它将为从名为creates
、updates
和deletes
的主题接收到的每条消息执行 Cypher 语句。验证所有配置选项是否正确,然后点击
继续
。 -
在下一个屏幕中,我们需要添加连接端点,以便连接器可以访问 Neo4j 或 AuraDB。从 Neo4j 连接 URI 中提取主机名和端口,并将其添加为端点。请记住,Neo4j 连接的默认端口号为
7687
。例如,对于连接 URIneo4j+s://<redacted>.databases.neo4j.io
,我们应输入<redacted>.databases.neo4j.io:7687
作为端点。 -
接下来,选择连接器应运行多少个任务,然后点击
继续
。 -
最后,命名连接器实例,查看您的设置并点击
继续
。 -
接收器实例将在几分钟内完成预配,并在几分钟后显示为
正在运行
。
测试
现在您可以访问您的 Confluent Cloud 集群,并验证是否至少创建了连接器配置中指定的 creates
主题。
在源和接收器连接器都运行的情况下,先前创建的 :TestSource
节点将导致源实例将消息发布到 creates
主题。然后,接收器实例将使用这些消息,并在 Neo4j 中创建相应的 :Person
和 :Family
节点。当您创建、更新和删除标记为 TestSource
的节点时,updates
和 deletes
主题也将被创建。
通过在 http://localhost:7474/browser/ 上的 Neo4j 浏览器中执行以下查询来检查情况是否如此
MATCH (n:(Person | Family)) RETURN n
您现在可以通过执行更多类似以下的语句来创建、更新或删除 Person 和 Family 节点
CREATE (:TestSource {name: 'Ann', surname: 'Bolin'});
验证是否创建了一个新的 Person
节点和一个新的 Family
节点并将其链接在一起。
MATCH (n:TestSource {name: 'mary', surname: 'doe'}) SET n.surname = 'smith';
验证现有的 Person
节点是否已更新为姓氏为 smith
并链接到一个新的 Family
节点。
MATCH (n:TestSource {name: 'mary', surname: 'smith'}) DELETE n;
验证现有的 Person
节点是否已删除。