配置系统

Kafka Connect Neo4j 连接器是将 Kafka 与 Neo4j 集成的推荐方法,因为 Neo4j Streams 不再处于积极开发阶段,并且在 Neo4j 4.4 版之后将不再受支持。

可以在此处找到 Kafka Connect Neo4j 连接器的最新版本。

配置系统概述

配置信息的位置

对于 4.0.7 之前的版本,配置管理使用 neo4j.conf 文件中提供的属性静态工作;从 4.0.7 版本开始,我们引入了一个基于动态重新加载的新配置系统,该系统依赖于 $NEO4J_HOME/conf 中的 streams.conf 文件。

重大更改

我们弃用了基于 neo4j.conf 文件的配置,因此您需要在 $NEO4J_HOME/conf 中定义一个新的 streams.conf 文件,并在其中放入所有必需的配置。

关于与 Docker 一起使用的说明

官方 Neo4j Docker 镜像使用特定的环境变量命名约定,以便将其转换为 neo4j.conf 文件中的属性。为了符合该行为,您仍然可以使用它们而无需更改配置中的任何内容,在幕后,从 4.0.7 版本开始,系统将改为将其保存到 streams.conf 文件中。

其工作原理

您可以通过两种方式与新的配置系统交互

  • 手动更改 streams.conf

  • 通过过程应用新的配置

您必须考虑对配置所做的任何更改都会导致插件重新加载,因此您必须非常小心地使用此功能。

基于文件更改

streams.conf 中的每个更改都会被收集并使用新配置重新加载 Streams 接收器

基于过程更改

4.0.7 版本开始,您将找到三个新的过程

  • streams.configuration.get 返回当前配置

  • streams.configuration.set({<plugin_config_map>}, {<procedure_config>}) 应用新配置并返回它

  • streams.configuration.remove({<plugin_config_keys_list>}, {<procedure_config>}) 删除提供的配置键并返回新的配置状态

注意 这些过程适用于每个数据库实例,这意味着如果您正在更改影响所有数据库的属性,则正在运行 Streams 模块的所有数据库都将收到通知并重新启动。

streams.configuration.get

此过程返回应用于 接收器 插件的当前配置。

输出参数

变量名称 描述

name

配置名称

value

配置值

因此,给定以下过程调用

CALL streams.configuration.get() YIELD name, value
RETURN name, value

您将获得以下输出(它与您的配置相关😄)

name value

"streams.sink.topic.cypher.test"

"CREATE (p:Person{name: event.name, surname: event.surname})"

"streams.sink.errors.tolerance"

"all"

"kafka.default.api.timeout.ms"

"5000"

"kafka.bootstrap.servers"

"broker:9093"

"streams.sink.errors.log.include.messages"

"true"

"streams.sink.enabled"

"true"

"streams.sink.errors.log.enable"

"true"

streams.configuration.set

此过程应用作为第一个参数传递的配置参数映射。

输入参数

变量名称 描述

plugin_config_map

此映射表示应用于 接收器 的配置集

procedure_config

配置映射

输出参数

变量名称 描述

name

配置名称

value

配置值

遵循 procedure_config 的已接受参数

配置名称 描述

save

(布尔值,默认为 true)是否将配置持久化到文件中,以便在数据库因任何原因重新启动后将其恢复

因此,给定以下过程调用

CALL streams.configuration.set({`streams.sink.topic.cypher.test`: 'CREATE (p:Person{name: event.name, surname: event.surname, fullName: event.name + ' ' + event.surname})'}, {save: false}) YIELD name, value
RETURN name, value

您将获得以下输出(它与您的配置相关😄)

name value

"streams.sink.topic.cypher.test"

"CREATE (p:Person{name: event.name, surname: event.surname})"

"streams.sink.errors.tolerance"

"all"

"kafka.default.api.timeout.ms"

"5000"

"kafka.bootstrap.servers"

"broker:9093"

"streams.sink.errors.log.include.messages"

"true"

"streams.sink.enabled"

"true"

"streams.sink.errors.log.enable"

"true"

streams.configuration.remove

此过程从配置中删除提供的键列表。

输入参数

变量名称 描述

plugin_config_keys_list

此列表表示将从配置中删除的属性集。

procedure_config

配置映射

输出参数

变量名称 描述

name

配置名称

value

配置值

遵循 procedure_config 的已接受参数

配置名称 描述

save

(布尔值,默认为 true)是否将配置持久化到文件中,以便在数据库因任何原因重新启动后将其恢复

因此,给定以下过程调用

CALL streams.configuration.remove([`kafka.acks`], {save: false}) YIELD name, value
RETURN name, value

您将获得以下输出(它与您的配置相关😄)

name value

"streams.sink.topic.cypher.test"

"CREATE (p:Person{name: event.name, surname: event.surname})"

"streams.sink.errors.tolerance"

"all"

"kafka.default.api.timeout.ms"

"5000"

"kafka.bootstrap.servers"

"broker:9093"

"streams.sink.errors.log.include.messages"

"true"

"streams.sink.enabled"

"true"

"streams.sink.errors.log.enable"

"true"

从过程更改配置属性时会发生什么

当我们从 streams.configuration.set/remove 更改配置属性时,在幕后 接收器 模块将重新加载。因此,请谨慎使用,因为它会影响您的流流程。

注意 仅当与自身相关的配置发生更改时,源/接收器模块才会重新启动;这意味着如果您同时激活了两个模块,并且您更改了与接收器相关的属性,则只有它会被重新启动。

源模块中会发生什么

在重新加载过程中,事务事件处理程序将被断开连接,这意味着即使在重新加载期间发生的任何事务也不会被捕获,因此它们会丢失

接收器模块中会发生什么

在重新加载过程中,接收器将停止,这不会对您的摄取过程产生任何影响,因为它将从上次提交的消息开始重新启动,因此不会丢失任何数据。