Neo4j Streams 常见问题解答
Kafka Connect Neo4j 连接器是将 Kafka 与 Neo4j 集成的推荐方法,因为 Neo4j Streams 不再处于积极开发中,并且在 Neo4j 4.4 版本之后将不再受支持。 最新版本的 Kafka Connect Neo4j 连接器可以在 这里 找到。 |
关于 CUD 文件格式
CUD 文件格式是 JSON 文件,它表示图实体(节点/关系)以及如何管理它们(在 **C**reate/**U**pdate/**D**elete 操作方面)。因此,每个 JSON 事件都表示单个操作。有关如何使用这些文件的更多详细信息,请查看 Neo4j Streams 插件的 CUD 文件格式 部分。
如何使用 CDC Schema 策略来摄取事件
更改数据捕获方法允许在不同的 Neo4j 实例之间摄取事件。如果您决定使用 Neo4j Streams 插件,则 Neo4j 源实例将配置如下
streams.sink.enabled=false
streams.source.enabled=true
streams.source.topic.nodes.<TOPIC_NAME>=<PATTERN>
streams.source.topic.relationships.<TOPIC_NAME>=<PATTERN>
而 Neo4j 接收器实例将配置如下
streams.source.enabled=false
streams.sink.topic.cdc.schema=<topic-name>
streams.sink.enabled=true
Confluent Cloud 是否支持 Neo4j Streams?
如果需要将连接器作为托管服务运行,那么答案是否定的。有兴趣将 Neo4j-Streams 作为 Confluent 云托管连接器运行的用户应该向 Confluent 提出此请求。目前,只有少数连接器可以作为托管服务运行,例如用于 S3 的连接器。点击 这里 了解详情。
但是,它在某种意义上是受支持的,因为 Neo4j Streams 可以连接到 Confluent Cloud 实例,并且 Confluent Cloud 部分解释了基本配置。以下链接提供了有关如何配置它以连接到 Confluent Cloud 的其他参考
-
https://www.confluent.io/blog/kafka-graph-visualizations/ (请参见“配置 Neo4j 与 Kafka 交互”部分)。
Kafka 输出事件说明
如果您将 Neo4j Streams 插件配置为接收器,使用 Cypher 查询将数据从 Kafka 导入 Neo4j,观察 Kafka 控制台消费者输出,您将看到描述节点和关系创建的 JSON 事件。它们看起来如下
{"meta":{"timestamp":1571329239766,"username":"neo4j","txId":20,"txEventId":99,"txEventsCount":1100,"operation":"created","source":{"hostname":"neo4j"}},"payload":{"id":"85","before":null,"after":{"properties":{"name":"Name 86","id":86,"age":2},"labels":["Person"]},"type":"node"},"schema":{"properties":{"name":"String","id":"Long","age":"Long"},"constraints":[]}}
{"meta":{"timestamp":1571329239766,"username":"neo4j","txId":20,"txEventId":100,"txEventsCount":1100,"operation":"created","source":{"hostname":"neo4j"}},"payload":{"id":"0","start":{"id":"0","labels":["Person"],"ids":{}},"end":{"id":"2","labels":["Person"],"ids":{}},"before":null,"after":{"properties":{"years":2}},"label":"KNOWS","type":"relationship"},"schema":{"properties":{"years":"Long"},"constraints":[]}}
一个 JSON 事件必须一次只描述一个 Neo4j 实体(节点/关系)。
指定的查询被认为是唯一的,因此所有涉及的实体都属于同一个事务
-
txId
标识影响实体的事务 -
txEventId
是一个计数器,标识 Neo4j 处理特定事件的内部顺序
如何配置 Kafka over SSL?
您将在此处找到一个指南 这里,说明如何配置 Neo4j Streams 插件以使用 Kafka over SSL。在幕后,Neo4j Streams 插件使用 Kafka 的官方 Java 库,因此您可以像配置 Java 客户端一样进行配置。
如果您还想使用 Kerberos,以下教程可能会有所帮助:https://henning.kropponline.de/2016/02/21/secure-kafka-java-producer-with-kerberos/
此外,还可以查看 Confluent Kafka 官方文档以获取有关此主题的更多详细信息。以下是一些有用的链接
启用 DLQ 功能
为了启用 DLQ 功能,您必须指定以下属性
streams.sink.errors.log.enable=true
streams.sink.errors.log.include.messages=true
必须指定这些属性,因为它们的默认值为 false。通过指定它们,您将能够记录错误和错误消息。此外,您还需要声明以下属性,如果省略,则没有 DLQ
streams.sink.errors.deadletterqueue.topic.name=<topic_name>
有关更多详细信息,请查看以下部分:如何处理错误数据
支持的 Kafka 反序列化器
Neo4j Streams 支持两种类型的反序列化器
-
org.apache.kafka.common.serialization.ByteArrayDeserializer
,如果您想管理 JSON 消息 -
io.confluent.kafka.serializers.KafkaAvroDeserializer
,如果您想管理 AVRO 消息
如果使用 AVRO,则还需要一个模式注册表配置
kafka.schema.registry.url=*.*.*.*:8081
其中 8081 是 Confluent 模式注册表的默认端口。