Neo4j Streams 常见问题解答

Kafka Connect Neo4j 连接器是将 Kafka 与 Neo4j 集成的推荐方法,因为 Neo4j Streams 不再处于积极开发中,并且在 Neo4j 4.4 版本之后将不再受支持。

最新版本的 Kafka Connect Neo4j 连接器可以在 这里 找到。

源代码许可证

Neo4j Streams 的源代码根据 Apache 许可证 2.0 版的条款提供。有关完整条款和条件,请参见源代码存储库中的 LICENSE 文件。

关于 CUD 文件格式

CUD 文件格式是 JSON 文件,它表示图实体(节点/关系)以及如何管理它们(在 **C**reate/**U**pdate/**D**elete 操作方面)。因此,每个 JSON 事件都表示单个操作。有关如何使用这些文件的更多详细信息,请查看 Neo4j Streams 插件的 CUD 文件格式 部分。

如何使用 CDC Schema 策略来摄取事件

更改数据捕获方法允许在不同的 Neo4j 实例之间摄取事件。如果您决定使用 Neo4j Streams 插件,则 Neo4j 源实例将配置如下

streams.sink.enabled=false
streams.source.enabled=true
streams.source.topic.nodes.<TOPIC_NAME>=<PATTERN>
streams.source.topic.relationships.<TOPIC_NAME>=<PATTERN>

而 Neo4j 接收器实例将配置如下

streams.source.enabled=false
streams.sink.topic.cdc.schema=<topic-name>
streams.sink.enabled=true

Confluent Cloud 是否支持 Neo4j Streams?

如果需要将连接器作为托管服务运行,那么答案是否定的。有兴趣将 Neo4j-Streams 作为 Confluent 云托管连接器运行的用户应该向 Confluent 提出此请求。目前,只有少数连接器可以作为托管服务运行,例如用于 S3 的连接器。点击 这里 了解详情。

但是,它在某种意义上是受支持的,因为 Neo4j Streams 可以连接到 Confluent Cloud 实例,并且 Confluent Cloud 部分解释了基本配置。以下链接提供了有关如何配置它以连接到 Confluent Cloud 的其他参考

Kafka 输出事件说明

如果您将 Neo4j Streams 插件配置为接收器,使用 Cypher 查询将数据从 Kafka 导入 Neo4j,观察 Kafka 控制台消费者输出,您将看到描述节点和关系创建的 JSON 事件。它们看起来如下

{"meta":{"timestamp":1571329239766,"username":"neo4j","txId":20,"txEventId":99,"txEventsCount":1100,"operation":"created","source":{"hostname":"neo4j"}},"payload":{"id":"85","before":null,"after":{"properties":{"name":"Name 86","id":86,"age":2},"labels":["Person"]},"type":"node"},"schema":{"properties":{"name":"String","id":"Long","age":"Long"},"constraints":[]}}

{"meta":{"timestamp":1571329239766,"username":"neo4j","txId":20,"txEventId":100,"txEventsCount":1100,"operation":"created","source":{"hostname":"neo4j"}},"payload":{"id":"0","start":{"id":"0","labels":["Person"],"ids":{}},"end":{"id":"2","labels":["Person"],"ids":{}},"before":null,"after":{"properties":{"years":2}},"label":"KNOWS","type":"relationship"},"schema":{"properties":{"years":"Long"},"constraints":[]}}

一个 JSON 事件必须一次只描述一个 Neo4j 实体(节点/关系)。

指定的查询被认为是唯一的,因此所有涉及的实体都属于同一个事务

  • txId 标识影响实体的事务

  • txEventId 是一个计数器,标识 Neo4j 处理特定事件的内部顺序

如何配置 Kafka over SSL?

您将在此处找到一个指南 这里,说明如何配置 Neo4j Streams 插件以使用 Kafka over SSL。在幕后,Neo4j Streams 插件使用 Kafka 的官方 Java 库,因此您可以像配置 Java 客户端一样进行配置。

如果您还想使用 Kerberos,以下教程可能会有所帮助:https://henning.kropponline.de/2016/02/21/secure-kafka-java-producer-with-kerberos/

此外,还可以查看 Confluent Kafka 官方文档以获取有关此主题的更多详细信息。以下是一些有用的链接

启用 DLQ 功能

为了启用 DLQ 功能,您必须指定以下属性

streams.sink.errors.log.enable=true
streams.sink.errors.log.include.messages=true

必须指定这些属性,因为它们的默认值为 false。通过指定它们,您将能够记录错误和错误消息。此外,您还需要声明以下属性,如果省略,则没有 DLQ

streams.sink.errors.deadletterqueue.topic.name=<topic_name>

有关更多详细信息,请查看以下部分:如何处理错误数据

支持的 Kafka 反序列化器

Neo4j Streams 支持两种类型的反序列化器

  • org.apache.kafka.common.serialization.ByteArrayDeserializer,如果您想管理 JSON 消息

  • io.confluent.kafka.serializers.KafkaAvroDeserializer,如果您想管理 AVRO 消息

如果使用 AVRO,则还需要一个模式注册表配置

kafka.schema.registry.url=*.*.*.*:8081

其中 8081 是 Confluent 模式注册表的默认端口。

具有多个分区设置的 Kafka 集群和主题

如果环境是 Kafka 集群,由以下组成

  • 多个 Zookeeper 服务器

  • 多个 Kafka 代理

  • 具有多个分区的主题

  • 配置为接收器的 Neo4j 实例

重要的是要正确设置 Zookeeper 服务器。这意味着 Zookeeper 实例的数量必须为 2n+1,其中 n 是任何大于 0 的数字。这是因为奇数个服务器允许 ZooKeeper 为领导权执行多数选举。

因此,如果集群设置不正确,可能会发生某些分区中生成的事件未被读取。

请查看以下链接以获取更多详细信息