快速入门
Kafka Connect Neo4j 连接器是将 Kafka 与 Neo4j 集成的推荐方法,因为 Neo4j Streams 不再处于积极开发阶段,并且在 Neo4j 4.4 版本之后将不再受支持。 Kafka Connect Neo4j 连接器的最新版本可以在此处找到。 |
Neo4j Streams 插件
安装插件
-
从https://github.com/neo4j-contrib/neo4j-streams/releases/4.1.5下载最新版本 jar 包
-
将其复制到
$NEO4J_HOME/plugins
并配置相关的连接
Kafka 设置
任何以kafka.
开头的配置选项都将传递给底层的 Kafka 驱动程序。Neo4j Streams 使用官方的 Confluent Kafka 生产者和消费者 Java 客户端。这些连接器有效的配置设置也适用于 Neo4j Streams。
例如,在下面链接的 Kafka 文档中,名为batch.size
的配置设置在 Neo4j Streams 中应写为kafka.batch.size
。
设置名称 | 描述 | 默认值 |
---|---|---|
kafka.max.poll.records |
每次从 Kafka 批量拉取的记录的最大数量。增加此数字意味着 Neo4j 内存中会有更大的事务,并且可能会提高吞吐量。 |
500 |
kafka.buffer.memory |
生产者可以用来缓冲待发送记录的内存总字节数。使用此设置调整插件可能需要多少内存来保存尚未传递到 Neo4j 的消息。 |
33554432 |
kafka.batch.size |
(仅限生产者) 当多个记录发送到同一分区时,生产者将尝试将记录一起批量处理成更少的请求。这有助于客户端和服务器的性能。此配置控制默认的批量大小(以字节为单位)。 |
16384 |
kafka.max.partition.fetch.bytes |
(仅限消费者) 服务器将返回的每个分区的最大数据量。记录由消费者批量获取。如果获取到的第一个非空分区的第一个记录批次大于此限制,则仍将返回该批次,以确保消费者可以取得进展。 |
1048576 |
kafka.group.id |
一个唯一的字符串,用于标识此消费者所属的消费者组。 |
N/A |
配置 Kafka 连接
如果您在本地运行或针对独立机器运行,请将neo4j.conf
配置为指向该服务器
kafka.bootstrap.servers=localhost:9092
如果您使用的是 Confluent Cloud(托管 Kafka),您可以按照Confluent Cloud部分中的说明连接到 Kafka
确定:接收器、发送器或两者
配置 neo4j-streams 分为三个不同的部分,具体取决于您的需求
-
必需:配置与 Kafka 的连接
kafka.bootstrap.servers=localhost:9092
根据您的用例和需求遵循一个或两个子部分
接收器
通过添加以下配置,将数据从 Kafka 获取并存储在 Neo4j 中(Neo4j 作为数据消费者):
streams.sink.enabled=true
streams.sink.topic.cypher.my-ingest-topic=MERGE (n:Label {id: event.id}) ON CREATE SET n += event.properties
这将处理到达my-ingest-topic
的每条消息,并使用给定的 Cypher 语句。当该 Cypher 语句执行时,引用的event
变量将设置为接收到的消息,因此此示例 Cypher 将在图中创建一个(:Label)
节点,并使用给定的 ID,复制源消息中的所有属性。
有关此处可以执行的操作的完整详细信息,请参阅文档的接收器部分。
发送器
通过添加以下配置,生成来自 Neo4j 的数据并将其发送到 Kafka 主题(Neo4j 作为数据生产者):
streams.source.topic.nodes.my-nodes-topic=Person{*}
streams.source.topic.relationships.my-rels-topic=BELONGS-TO{*}
streams.source.enabled=true
streams.source.schema.polling.interval=10000
这会将所有标记为(:Person)
的图节点发送到主题my-nodes-topic
,并将所有类型为-[:BELONGS-TO]→
的关系发送到名为my-rels-topic
的主题。此外,架构更改将每 10,000 毫秒轮询一次,这会影响数据库获取新索引/架构更改的速度。请注意,如果未为streams.source.schema.polling.interval
属性指定值,则 Streams 插件将使用 300,000 毫秒作为默认值。
表达式Person{*}
和BELONGS-TO{*}
是模式。您可以在模式部分找到有关如何更改它们的文档。
有关此处可以执行的操作的完整详细信息,请参阅文档的发送器部分。
重启 Neo4j
安装并配置插件后,重新启动数据库将使其处于活动状态。如果您已配置 Neo4j 从 Kafka 消费数据,它将立即开始处理消息。
将 Neo4j Streams 插件的最新版本安装到 Neo4j 4.x 中时,查看日志您可能会发现类似以下内容
这些不是错误。它们来自新的 Neo4j 4 配置系统,该系统警告它无法识别这些属性。尽管有这些警告,插件仍将正常工作。 |