接收器:从 Kafka 消费记录

Kafka Connect Neo4j 连接器是将 Kafka 与 Neo4j 集成的推荐方法,因为 Neo4j Streams 不再处于积极开发阶段,并且在 Neo4j 4.4 版之后将不再受支持。

Kafka Connect Neo4j 连接器的最新版本可以在这里找到 这里

本章介绍了不同的数据摄取策略,何时使用它们以及何时避免使用它们。

关于批量导入 Neo4j 的重要信息

因为写入关系需要获取两个关联节点的锁,所以通常在对 Neo4j 进行高性能批量插入时,我们希望使用单线程批量,并且我们不希望并行化加载过程。如果在没有仔细设计的情况下并行化加载,大多数情况下会得到线程等待和锁异常/错误的组合。

因此,总体方法应该是将记录批次序列化为“一个接一个”的事务流,以最大化吞吐量。

概述

unwind consume

这显示了 Cypher 数据摄取策略。尽管这是一个高级视图 - 我们从 Kafka 获取消息并将其写入 Neo4j,但这并不是全部。

一般原则

在您开始使用导入方法之前!

transformer architecture
  • 使用 Kafka!不要将 Neo4j 用于所有事情:通常,最好使用 Kafka 和 KSQL 等工具来重新塑造、转换和操作流,它到达 Neo4j 之前。在此阶段,您拥有更多灵活的选择,并且整体性能更好。回顾一下“图 ETL”部分。尝试使 neo4j-streams 只执行转换和加载部分,因为它们对于图是不可避免的。

  • 不要在 Cypher 中执行复杂的 ETL:避免在将传入消息转换为 Neo4j 时执行复杂转换的情况。可以使用纯 Cypher 来完成,但基本上您将推迟“Kafka 消息清理”任务并委托给 Neo4j 执行,如果可以避免,这不是一个好策略。

  • KISS 原则:输入格式越简单,架构越好/越干净越易于演变,Neo4j 需要做的工作就越少。

  • 保持主题一致性:应尽可能避免包含各种不同格式消息的主题。它们将非常难以处理,除非您绝对必须这样做,否则不应尝试。

在您无法控制主题或其内容的受限客户环境中,这可能会变得具有挑战性,因此这仅作为一组原则提出,而不是硬性规则。

变更数据捕获 - CDC

  • 需要一个特定的 Debezium JSON 格式,该格式指定另一个系统事务中发生了哪些更改

  • 极其重要:对于 CDC 用法,消息顺序很重要,因此建议使用单个 Kafka 主题分区——但正因为如此,吞吐量可能会受到影响,因为并行化不是一种选择。

  • 包含两个子策略:SourceID 和 Schema 策略,它们适用于特定的 CDC 案例,并允许您以特定的方式合并数据。只有在您拥有 Debezium 格式数据时,这些子策略才有趣。

  • 何时使用

    • 尝试镜像来自另一个知道如何以 Debezium 格式发布数据的数据库的更改

  • 何时避免

    • 主题上的自定义消息格式,或来自其他应用程序而不是数据库的消息。

  • 其他注意事项和约束

    • SourceId 可能容易受到内部 ID 重用的影响,这可能是由于删除或备份还原造成的。除非您对 ID 的唯一性有严格的约束,否则在使用此方法时请谨慎。

    • Schema 策略依赖于唯一性约束,这些约束不能用于关系属性。这意味着关系键实际上是源节点 + 目标节点 + 关系类型,这反过来意味着模型在两个节点之间不能有多个相同类型的边。

模式策略

  • 这包含两个子策略:节点和关系策略。

  • 模式策略始终假设传入消息表示 1 个且仅 1 个节点或关系。

  • 所有子策略(节点和关系)的细节都涉及导入哪些部分以及导入到哪个节点标签或关系类型。

  • 何时使用

    • 最适合高容量、非常细粒度的导入,您希望从中提取消息中的特定属性。

  • 何时避免

    • 您有大型复杂的 JSON 文档,您无法控制它们,它们表示节点和边的混合。

CUD 策略

  • 这类似于 Debezium CDC 格式。CUD 代表“创建、更新、删除”。这是一种通过 Kafka 向 Neo4j“发送命令”以创建、更新或删除节点或关系的方法

  • 何时使用

    • 当您可以将复杂的上游消息重新格式化为(在 Kafka 中)一系列简单的命令时,这是一个不错的选择。这是一种“将复杂的东西变得简单”的好方法

    • 您需要将更改从读取副本流回核心。例如,如果您在读取副本上运行 GDS 算法,然后希望更新属性以设置社区标识符,则无法在读取副本上执行此操作。但是,您可以从 Kafka 中的读取副本发布 CUD 消息,然后将核心设置为在特定主题上摄取该消息。换句话说——这是一种用于 Neo4j 集群间通信的方法。

  • 何时避免

    • 此策略需要严格约束的数据格式,因此当您无法控制主题时,它不适用。

Cypher 模板策略

  • 这是处理任何传入消息和任何 Cypher 语句的通用通用策略。

  • 它功能最强大,也最难配置,因为它要求您在配置文件中编写 Cypher。

  • 何时使用

    • 当您无法控制输入主题,并且需要转换输入数据而不是将其加载到结构中时,它最适合。

  • 何时避免

    • Cypher 查询变得越来越长和复杂,或者需要频繁更改。这表明将过多的提取和转换工作推给了 Neo4j,并且需要流转换器。

并行性

插件始终以顺序批处理事务运行,连接到 Kafka 客户端的单个轮询。

单个 Kafka 客户端线程按以下方式进行:* 从 Kafka 获取记录 * 形成一批“事件” * 将它们写入 Neo4j,通常采用以下形式:UNWIND events AS event

有关 poll() 操作的工作原理的完整详细信息,请参阅 Kafka 文档中的信息。

大规模加载多种数据

如果我们想以高性能的方式加载 8 种类型的节点和关系,我们该怎么做?

一般建议

  • 将您要加载的数据分解成多个不同的主题,每个主题一个分区,而不是拥有一个包含混合/大型内容的大型主题

  • 使用流转换器和 KSQL 技术将消息构建成可以使用除 Cypher 模板之外的其他数据摄取策略的格式,以简化您需要编写的代码,并避免由于 Cypher 查询更改而需要循环集群。

  • 尝试调整批次大小和内存大小,直到获得良好的吞吐量