Confluent Cloud 快速入门
本页面包含有关第三方平台使用方法的说明,这些说明可能会发生超出我们控制的更改。如有疑问,请参阅第三方平台文档。 |
Neo4j 支持 Confluent Cloud,其 Neo4j Connector for Confluent 作为自定义连接器运行。Confluent 的自定义连接器提供了一种将 Confluent Cloud 扩展到其平台上现有完全托管连接器之外的方式。
先决条件
-
已启用 CDC 的 AuraDB Enterprise 或 Neo4j Enterprise 本地部署,可通过公共互联网访问。请遵循启用 Neo4j Aura 上的 CDC 或启用 Neo4j DBMS 上的 CDC 获取说明。
-
Confluent Cloud 中必须有可用的环境和正在运行的集群,类似于图 1“Confluent Cloud 中的运行中集群”所示。

上传自定义连接器
在创建连接器实例之前,我们首先需要将适用于 Kafka 的 Neo4j 连接器定义为自定义连接器。
源
-
选择要安装连接器的集群,打开“Connectors”部分,然后点击“Add plugin”。
-
点击“Add Plugin”,按照以下所示填写新自定义连接器的详细信息,然后接受条件并点击“Submit”。
- 连接器插件名称
-
适用于 Confluent 的 Neo4j 源连接器
- 自定义插件描述
-
适用于 Confluent 的 Neo4j 源连接器插件作为自定义连接器。
- 连接器类
-
org.neo4j.connectors.kafka.source.Neo4jConnector
- 连接器类型
-
源
- 连接器归档文件
-
通过遵循分发说明获取最新的 Confluent Hub 组件归档包,并从您的本地计算机中选择已下载的 `neo4j-kafka-connector-5.1.12.zip` 文件。
- 敏感属性
-
为了保护连接器实例中的敏感配置属性,您应该至少将以下配置属性标记为敏感。
neo4j.authentication.basic.password neo4j.authentication.kerberos.ticket neo4j.authentication.bearer.token neo4j.authentication.custom.credentials
它将上传归档文件并创建源插件。
目标
-
选择要安装连接器的集群,打开“Connectors”部分,然后点击“Add plugin”。
-
点击“Add Plugin”,按照以下所示填写新自定义连接器的详细信息,然后接受条件并点击“Submit”。
- 连接器插件名称
-
适用于 Confluent 的 Neo4j 目标连接器
- 自定义插件描述
-
适用于 Confluent 的 Neo4j 目标连接器插件作为自定义连接器。
- 连接器类
-
org.neo4j.connectors.kafka.sink.Neo4jConnector
- 连接器类型
-
目标
- 连接器归档文件
-
通过遵循分发说明获取最新的 Confluent Hub 组件归档包,并从您的本地计算机中选择已下载的 `neo4j-kafka-connector-5.1.12.zip` 文件。
- 敏感属性
-
为了保护连接器实例中的敏感配置属性,您应该至少将以下配置属性标记为敏感。
neo4j.authentication.basic.password neo4j.authentication.kerberos.ticket neo4j.authentication.bearer.token neo4j.authentication.custom.credentials
它将上传归档文件并创建目标插件。
自定义连接器是用户创建的 Kafka Connect 插件、修改的开源连接器插件或第三方连接器插件,例如适用于 Confluent 的 Neo4j 连接器。在Confluent 文档中了解更多关于自定义连接器的信息,并阅读我们的博客文章,其中包含一个关于如何使用 Aura 进行设置的示例。
您可以通过遵循Neo4j 开发者博客上的说明,从 Confluent Cloud 创建连接器到 Neo4j 和Neo4j AuraDB。
创建源实例
在上节中创建了自定义连接器后,我们现在可以开始配置我们的源实例。
-
在 Confluent Cloud 中,转到您的集群的“Connectors”部分,并搜索我们之前在上面创建的插件“适用于 Confluent 的 Neo4j 源连接器”。
-
点击连接器以开始配置我们的源连接器实例。
-
配置用于访问 Kafka 集群的 API 密钥,然后点击“继续”。
-
首先点击“自动配置 Schema Registry”,并根据您的偏好选择“JSON Schema”、“Avro”或“Protobuf”中的一个,然后点击“应用更改”。这将生成几个用于模式支持的配置选项。接下来,以单独的键值对形式或添加到现有 JSON 中配置连接器配置选项。为了快速入门,我们将配置我们的源实例,使其将与模式
(:TestSource)
匹配的节点上的更改事件消息发送到名为creates
、updates
和deletes
的主题,使用您首选的序列化格式。验证所有配置选项是否正确,然后点击“继续”。
-
在下一个屏幕中,我们需要添加连接端点,以便我们的连接器可以访问 Neo4j 或 AuraDB。从 Neo4j 连接 URI 中提取主机名和端口,并将其添加为端点。请记住,Neo4j 连接的默认端口号是
7687
。例如,对于连接 URIneo4j+s://<redacted>.databases.neo4j.io
,我们应该输入<redacted>.databases.neo4j.io:7687
作为端点。 -
接下来,选择您的连接器应运行的任务数量,然后点击“继续”。源连接器总是运行 1 个任务,因此默认值
1
就足够了。 -
最后,命名您的连接器实例,审查您的设置,然后点击“继续”。
-
源实例将在几分钟内配置完成并显示为“运行中”。
现在您有了正在运行的源实例,可以在 Neo4j 中创建以下节点:
CREATE (:TestSource {name: 'john', surname: 'doe'});
CREATE (:TestSource {name: 'mary', surname: 'doe'});
CREATE (:TestSource {name: 'jack', surname: 'small'});
这将导致新消息发布到名为 creates
的主题。
创建目标实例
在上节中创建了源实例后,我们现在可以开始配置我们的目标实例,以便能够处理源实例生成的消息。
-
在 Confluent Cloud 中,转到您的集群的“Connectors”部分,并搜索我们之前在上面创建的插件“适用于 Confluent 的 Neo4j 目标连接器”。
-
点击连接器以开始配置我们的目标连接器实例。
-
配置用于访问 Kafka 集群的 API 密钥,然后点击“继续”。
-
首先点击“自动配置 Schema Registry”,并根据您的偏好选择“JSON Schema”、“Avro”或“Protobuf”中的一个,然后点击“应用更改”。这将生成几个用于模式支持的配置选项。接下来,以单独的键值对形式或添加到现有 JSON 中配置连接器配置选项。为了快速入门,我们将配置我们的目标实例,使其对从名为
creates
、updates
和deletes
的主题接收到的每条消息执行 Cypher 语句。验证所有配置选项是否正确,然后点击“继续”。
-
在下一个屏幕中,我们需要添加连接端点,以便我们的连接器可以访问 Neo4j 或 AuraDB。从 Neo4j 连接 URI 中提取主机名和端口,并将其添加为端点。请记住,Neo4j 连接的默认端口号是
7687
。例如,对于连接 URIneo4j+s://<redacted>.databases.neo4j.io
,我们应该输入<redacted>.databases.neo4j.io:7687
作为端点。 -
接下来,选择您的连接器应运行的任务数量,然后点击“继续”。
-
最后,命名您的连接器实例,审查您的设置,然后点击“继续”。
-
目标实例将在几分钟内配置完成并显示为“运行中”。
测试
现在您可以访问您的 Confluent Cloud 集群,并验证至少已按照连接器配置创建了 creates
主题。
在源连接器和目标连接器都运行的情况下,之前创建的 :TestSource
节点将导致源实例将消息发布到 creates
主题。这些消息将由目标实例消费,并在 Neo4j 中创建相应的 :Person
和 :Family
节点。当您创建、更新和删除带有 TestSource
标签的节点时,updates
和 deletes
主题也将被创建。
通过在 Neo4j Browser (http://localhost:7474/browser/) 中执行以下查询来检查是否如此:
MATCH (n:(Person | Family)) RETURN n
您现在可以通过执行更多语句来创建、更新或删除 Person 和 Family 节点,例如:
CREATE (:TestSource {name: 'Ann', surname: 'Bolin'});
验证是否创建了一个新的 Person
节点和一个新的 Family
节点并已链接在一起。
MATCH (n:TestSource {name: 'mary', surname: 'doe'}) SET n.surname = 'smith';
验证现有 Person
节点是否已更新为姓氏 smith
并链接到新的 Family
节点。
MATCH (n:TestSource {name: 'mary', surname: 'smith'}) DELETE n;
验证现有 Person
节点是否已删除。