快速入门

Kafka Connect Neo4j 连接器是将 Kafka 与 Neo4j 集成的推荐方法,因为 Neo4j Streams 不再处于积极开发阶段,并且在 Neo4j 4.4 版本之后将不再受支持。

Kafka Connect Neo4j 连接器的最新版本可以在此处找到。

Neo4j Streams 插件

任何以streams.开头的配置选项都控制插件本身的行为。有关可用选项的完整列表,请参阅发送器接收器的文档子部分。

安装插件

Kafka 设置

任何以kafka.开头的配置选项都将传递给底层的 Kafka 驱动程序。Neo4j Streams 使用官方的 Confluent Kafka 生产者和消费者 Java 客户端。这些连接器有效的配置设置也适用于 Neo4j Streams。

例如,在下面链接的 Kafka 文档中,名为batch.size的配置设置在 Neo4j Streams 中应写为kafka.batch.size

以下是一些您可能希望使用的常见配置设置。这不是完整的列表。完整的配置选项列表和参考材料可从 Confluent 的网站获取,用于接收器配置发送器配置

表 1. 最常用的配置设置
设置名称 描述 默认值

kafka.max.poll.records

每次从 Kafka 批量拉取的记录的最大数量。增加此数字意味着 Neo4j 内存中会有更大的事务,并且可能会提高吞吐量。

500

kafka.buffer.memory

生产者可以用来缓冲待发送记录的内存总字节数。使用此设置调整插件可能需要多少内存来保存尚未传递到 Neo4j 的消息。

33554432

kafka.batch.size

(仅限生产者) 当多个记录发送到同一分区时,生产者将尝试将记录一起批量处理成更少的请求。这有助于客户端和服务器的性能。此配置控制默认的批量大小(以字节为单位)。

16384

kafka.max.partition.fetch.bytes

(仅限消费者) 服务器将返回的每个分区的最大数据量。记录由消费者批量获取。如果获取到的第一个非空分区的第一个记录批次大于此限制,则仍将返回该批次,以确保消费者可以取得进展。

1048576

kafka.group.id

一个唯一的字符串,用于标识此消费者所属的消费者组。

N/A

配置 Kafka 连接

如果您在本地运行或针对独立机器运行,请将neo4j.conf配置为指向该服务器

neo4j.conf
kafka.bootstrap.servers=localhost:9092

如果您使用的是 Confluent Cloud(托管 Kafka),您可以按照Confluent Cloud部分中的说明连接到 Kafka

确定:接收器、发送器或两者

配置 neo4j-streams 分为三个不同的部分,具体取决于您的需求

  1. 必需:配置与 Kafka 的连接

neo4j.conf
kafka.bootstrap.servers=localhost:9092
  1. 可选:配置 Neo4j 以将记录发送到 Kafka (发送器)

  2. 可选:配置 Neo4j 以从 Kafka 导入数据 (接收器)

根据您的用例和需求遵循一个或两个子部分

接收器

通过添加以下配置,将数据从 Kafka 获取并存储在 Neo4j 中(Neo4j 作为数据消费者):

neo4j.conf
streams.sink.enabled=true
streams.sink.topic.cypher.my-ingest-topic=MERGE (n:Label {id: event.id}) ON CREATE SET n += event.properties

这将处理到达my-ingest-topic的每条消息,并使用给定的 Cypher 语句。当该 Cypher 语句执行时,引用的event变量将设置为接收到的消息,因此此示例 Cypher 将在图中创建一个(:Label)节点,并使用给定的 ID,复制源消息中的所有属性。

有关此处可以执行的操作的完整详细信息,请参阅文档的接收器部分。

发送器

通过添加以下配置,生成来自 Neo4j 的数据并将其发送到 Kafka 主题(Neo4j 作为数据生产者):

neo4j.conf
streams.source.topic.nodes.my-nodes-topic=Person{*}
streams.source.topic.relationships.my-rels-topic=BELONGS-TO{*}
streams.source.enabled=true
streams.source.schema.polling.interval=10000

这会将所有标记为(:Person)的图节点发送到主题my-nodes-topic,并将所有类型为-[:BELONGS-TO]→的关系发送到名为my-rels-topic的主题。此外,架构更改将每 10,000 毫秒轮询一次,这会影响数据库获取新索引/架构更改的速度。请注意,如果未为streams.source.schema.polling.interval属性指定值,则 Streams 插件将使用 300,000 毫秒作为默认值。

表达式Person{*}BELONGS-TO{*}模式。您可以在模式部分找到有关如何更改它们的文档。

有关此处可以执行的操作的完整详细信息,请参阅文档的发送器部分。

重启 Neo4j

安装并配置插件后,重新启动数据库将使其处于活动状态。如果您已配置 Neo4j 从 Kafka 消费数据,它将立即开始处理消息。

将 Neo4j Streams 插件的最新版本安装到 Neo4j 4.x 中时,查看日志您可能会发现类似以下内容

2020-03-25 20:13:50.606+0000 WARN  Unrecognized setting. No declared setting with name: kafka.max.partition.fetch.bytes
2020-03-25 20:13:50.608+0000 WARN  Unrecognized setting. No declared setting with name: streams.sink.errors.log.include.messages
2020-03-25 20:13:50.608+0000 WARN  Unrecognized setting. No declared setting with name: kafka.auto.offset.reset
2020-03-25 20:13:50.608+0000 WARN  Unrecognized setting. No declared setting with name: kafka.bootstrap.servers
2020-03-25 20:13:50.608+0000 WARN  Unrecognized setting. No declared setting with name: kafka.max.poll.records
2020-03-25 20:13:50.609+0000 WARN  Unrecognized setting. No declared setting with name: streams.sink.errors.log.enable
2020-03-25 20:13:50.609+0000 WARN  Unrecognized setting. No declared setting with name: streams.source.enabled
2020-03-25 20:13:50.609+0000 WARN  Unrecognized setting. No declared setting with name: streams.sink.topic.cypher.boa.to.kafkaTest
2020-03-25 20:13:50.609+0000 WARN  Unrecognized setting. No declared setting with name: streams.sink.errors.tolerance
2020-03-25 20:13:50.609+0000 WARN  Unrecognized setting. No declared setting with name: kafka.group.id
2020-03-25 20:13:50.609+0000 WARN  Unrecognized setting. No declared setting with name: streams.sink.errors.deadletterqueue.context.headers.enable
2020-03-25 20:13:50.609+0000 WARN  Unrecognized setting. No declared setting with name: streams.sink.errors.deadletterqueue.context.header.prefix
2020-03-25 20:13:50.610+0000 WARN  Unrecognized setting. No declared setting with name: streams.sink.errors.deadletterqueue.topic.name
2020-03-25 20:13:50.610+0000 WARN  Unrecognized setting. No declared setting with name: streams.sink.enabled.to.kafkaTest

这些不是错误。它们来自新的 Neo4j 4 配置系统,该系统警告它无法识别这些属性。尽管有这些警告,插件仍将正常工作。