来源:将记录生产到 Kafka

Kafka Connect Neo4j Connector 是将 Kafka 与 Neo4j 集成的推荐方法,因为 Neo4j Streams 不再处于积极开发中,并且在 Neo4j 4.4 版之后将不再受支持。

Kafka Connect Neo4j Connector 的最新版本可以找到 这里.

本章介绍了有关如何以高性能方式将数据从 Neo4j 生产到 Kafka 的原则。

一般原则

  • 不要发布所有内容,要仔细考虑要发布哪些内容,以便在插件代码中花费更少的时间

  • 如果您定期进行非常大的事务,请三思而后行,考虑哪些其他策略可供您使用来发布(例如,APOC 触发器 + 自定义 Cypher 代码 + neo4j-streams 过程来“自定义发布”到 Kafka)

TransactionEventHandlers

transaction event handler

我们向 Kafka 生成记录的方式是使用事务事件处理程序。这带来了几个重要的注意事项

  • 在 Neo4j 中,**客户端的事务无法完成提交**,直到所有 TransactionEventHandlers 都已完成(或将工作委托给单独的线程)。这意味着,由于需要对事务进行序列化并将其发送到网络上,使用生产者可能会减慢底层事务的提交速度

  • **非常大的事务可能会出现问题**,因为它们必须序列化为非常大量的数据(例如,单个 JSON 文档),因此必须以一种具有内存管理设置的方式配置插件,以牢记这一点

  • **TransactionEventHandlers 仅在领导者上触发**。因此,如果您为因果集群配置 Neo4j-Streams,则必须为集群中的所有 CORE 成员执行此操作。当 LEADER 在集群中轮换时,事务将始终发布,但这需要在所有 3 个 CORE 节点上进行一致的配置。当事务从 LEADER → FOLLOWER 复制时,事务提交将不会触发,并且不会出现重复的发布。

自定义发布

源插件旨在以 Debezium 格式发布,这与 Kafka 生态系统中针对此类事件的标准数据格式最为接近。但有时您会发现它有约束力,并且您可能需要以自定义格式发布。

截至撰写本文时,最佳选择是编写 Cypher + APOC 触发器的组合。APOC 触发器可以在对系统进行关键更新时有选择地触发,使用与上述相同的 TransactionEventHandler 方法。并且“触发代码”可以是 Cypher,您可以使用它来处理生成的数据,并使用 CALL streams.publish() 将您喜欢的任何消息格式化发送到 Kafka。