影响吞吐量的因素

Kafka Connect Neo4j Connector 是将 Kafka 与 Neo4j 集成的推荐方法,因为 Neo4j Streams 已经不再处于积极开发中,并且在 Neo4j 4.4 版本之后将不再受支持。

Kafka Connect Neo4j Connector 的最新版本可以在这里找到 here.

这主要与从 Kafka → Neo4j 接收记录有关,但许多相同的原则也适用于向 Kafka 生成记录。

Neo4j 设置

一般来说,Neo4j-Streams 使用以下策略向 Neo4j 写入数据

unwind consume
UNWIND events AS event
(your cypher here)

因此,一个关键因素是每个事务中“events”数组中包含多少项。

  • 堆大小会影响来自 Kafka 的事务可以有多大,而不会使其他正在运行的查询饿死。

  • 页面缓存大小会影响有多少数据处于热状态,并对 neo4j-streams 运行的 cypher 查询产生重大影响。

Kafka 消费者设置

一个主要因素是您每次从 Kafka 中提取多少数据,以及这些数据是如何转化为一批记录的。Neo4j-Streams 使用 Kafka 的官方 Java 客户端与消息队列进行通信,并运行 Kafka 中的 poll() 操作。配置中以 kafka.* 开头的设置适用于此客户端,并控制其轮询方式、使用多少内存、批次有多大,等等。配置不当会导致严重的性能问题,例如,如果您每次轮询操作只获得 1 条记录,那么您将以 1 批的规模向 Neo4j 写入数据,从而最大限度地延长您花费在事务开销上的总时间。

有关 Kafka 消费者工作原理的更多信息,请参阅此优秀文档。特别是查看“The Poll Loop”部分,这正是 Neo4j-Streams 在内部执行的操作。

一些关于 Neo4j 的一般性说明

  • 批次大小 (neo4j.batch.size) - 单个事务批次中包含的消息数量。

  • 最大轮询记录 (kafka.max.poll.records) - Neo4j 中每个事务使用的记录数量。在内存使用和总事务开销之间存在权衡。

    • 较少的大型批次可以更快地将数据导入 Neo4j,但需要更多内存。

    • 有效载荷越小,批次越大(通过 unwind)。一个默认值是 1000,然后逐渐增加。如果您只是通过 unwind 创建节点,那么您可以设置更高的值(以 20k 为起点)。然后对于关系合并,使用较低的数字(回到 1000-5000)。每个批次代表一个内存中的事务,请记住消息大小 * 批次大小是确定您事务需要多少堆内存的重要因素。

  • 提取字节 (kafka.max.partition.fetch.bytes) 服务器将返回的每个分区的最大数据量。记录由消费者以批次提取。如果第一个非空分区的第一个记录批次大于此限制,则该批次仍将被返回以确保消费者可以继续执行。

每次 kafka 客户端调用 poll() 操作时,都会受到这些因素的限制。第一个是您可以提取的最大字节数,以限制内存开销。第二个是您可能希望在一个批次中包含多少条记录。请注意,在此层面上,您不知道每个记录有多少字节。批次大小的默认值为 1mb。因此,假设您的记录大小为 200kb(大型 json 文件)。如果您将批次大小保留为默认的 1mb,那么您将永远不会在每个事务中拥有超过 5 条记录。最大轮询记录会限制另一个方面。最后,您可能希望在高级场景中读取或调整 kafka.max.poll.interval.ms 以限制花费在轮询上的时间。请参阅此文档,以获取有关该设置的更多信息。

一个合乎逻辑的设置可能是将最大轮询记录设置为您的预期事务批次大小,将 neo4j.batch.size 设置为相同的数字。一般来说,您可以保持 kafka.max.partition.fetch.bytes 的默认值不变,但如果您需要出于内存原因对其进行调整,则它应该等于最大轮询记录 * 平均每个记录的字节数,再加上 10% 左右。

(通过在配置中加上“kafka.”前缀,在 neo4j-streams 中使用这些配置)

Kafka 分区策略

一个重要的因素可能是 Kafka 主题的设置方式。请参阅如何在 Kafka 集群中选择主题/分区的数量.

kafka partitions

以下是一些关键观察结果

  • Kafka 主题中的分区越多,总体的吞吐量就越好

  • 如果端到端消息排序至关重要(例如在 CDC 设置中),则绝对需要单个分区

  • 分区越多,需要的内存和并发性就越多

网络

  • 到 Kafka 服务器的高延迟或低吞吐量可能会成为问题