接收器:从 Kafka 消费记录
Kafka Connect Neo4j 连接器是将 Kafka 与 Neo4j 集成的推荐方法,因为 Neo4j Streams 不再处于积极开发阶段,并且在 Neo4j 4.4 版之后将不再受支持。 Kafka Connect Neo4j 连接器的最新版本可以在这里找到 这里。 |
本章介绍了不同的数据摄取策略,何时使用它们以及何时避免使用它们。
关于批量导入 Neo4j 的重要信息
因为写入关系需要获取两个关联节点的锁,所以通常在对 Neo4j 进行高性能批量插入时,我们希望使用单线程批量,并且我们不希望并行化加载过程。如果在没有仔细设计的情况下并行化加载,大多数情况下会得到线程等待和锁异常/错误的组合。
因此,总体方法应该是将记录批次序列化为“一个接一个”的事务流,以最大化吞吐量。
一般原则
在您开始使用导入方法之前!

-
使用 Kafka!不要将 Neo4j 用于所有事情:通常,最好使用 Kafka 和 KSQL 等工具来重新塑造、转换和操作流,在它到达 Neo4j 之前。在此阶段,您拥有更多灵活的选择,并且整体性能更好。回顾一下“图 ETL”部分。尝试使 neo4j-streams 只执行转换和加载部分,因为它们对于图是不可避免的。
-
不要在 Cypher 中执行复杂的 ETL:避免在将传入消息转换为 Neo4j 时执行复杂转换的情况。可以使用纯 Cypher 来完成,但基本上您将推迟“Kafka 消息清理”任务并委托给 Neo4j 执行,如果可以避免,这不是一个好策略。
-
KISS 原则:输入格式越简单,架构越好/越干净越易于演变,Neo4j 需要做的工作就越少。
-
保持主题一致性:应尽可能避免包含各种不同格式消息的主题。它们将非常难以处理,除非您绝对必须这样做,否则不应尝试。
在您无法控制主题或其内容的受限客户环境中,这可能会变得具有挑战性,因此这仅作为一组原则提出,而不是硬性规则。
变更数据捕获 - CDC
-
需要一个特定的 Debezium JSON 格式,该格式指定另一个系统事务中发生了哪些更改
-
极其重要:对于 CDC 用法,消息顺序很重要,因此建议使用单个 Kafka 主题分区——但正因为如此,吞吐量可能会受到影响,因为并行化不是一种选择。
-
包含两个子策略:SourceID 和 Schema 策略,它们适用于特定的 CDC 案例,并允许您以特定的方式合并数据。只有在您拥有 Debezium 格式数据时,这些子策略才有趣。
-
何时使用
-
尝试镜像来自另一个知道如何以 Debezium 格式发布数据的数据库的更改
-
-
何时避免
-
主题上的自定义消息格式,或来自其他应用程序而不是数据库的消息。
-
-
其他注意事项和约束
-
SourceId 可能容易受到内部 ID 重用的影响,这可能是由于删除或备份还原造成的。除非您对 ID 的唯一性有严格的约束,否则在使用此方法时请谨慎。
-
Schema 策略依赖于唯一性约束,这些约束不能用于关系属性。这意味着关系键实际上是源节点 + 目标节点 + 关系类型,这反过来意味着模型在两个节点之间不能有多个相同类型的边。
-
模式策略
-
这包含两个子策略:节点和关系策略。
-
模式策略始终假设传入消息表示 1 个且仅 1 个节点或关系。
-
所有子策略(节点和关系)的细节都涉及导入哪些部分以及导入到哪个节点标签或关系类型。
-
何时使用
-
最适合高容量、非常细粒度的导入,您希望从中提取消息中的特定属性。
-
-
何时避免
-
您有大型复杂的 JSON 文档,您无法控制它们,它们表示节点和边的混合。
-
CUD 策略
-
这类似于 Debezium CDC 格式。CUD 代表“创建、更新、删除”。这是一种通过 Kafka 向 Neo4j“发送命令”以创建、更新或删除节点或关系的方法
-
何时使用
-
当您可以将复杂的上游消息重新格式化为(在 Kafka 中)一系列简单的命令时,这是一个不错的选择。这是一种“将复杂的东西变得简单”的好方法
-
您需要将更改从读取副本流回核心。例如,如果您在读取副本上运行 GDS 算法,然后希望更新属性以设置社区标识符,则无法在读取副本上执行此操作。但是,您可以从 Kafka 中的读取副本发布 CUD 消息,然后将核心设置为在特定主题上摄取该消息。换句话说——这是一种用于 Neo4j 集群间通信的方法。
-
-
何时避免
-
此策略需要严格约束的数据格式,因此当您无法控制主题时,它不适用。
-
Cypher 模板策略
-
这是处理任何传入消息和任何 Cypher 语句的通用通用策略。
-
它功能最强大,也最难配置,因为它要求您在配置文件中编写 Cypher。
-
何时使用
-
当您无法控制输入主题,并且需要转换输入数据而不是将其加载到结构中时,它最适合。
-
-
何时避免
-
Cypher 查询变得越来越长和复杂,或者需要频繁更改。这表明将过多的提取和转换工作推给了 Neo4j,并且需要流转换器。
-