配置系统
Kafka Connect Neo4j 连接器是将 Kafka 与 Neo4j 集成的推荐方法,因为 Neo4j Streams 不再处于积极开发阶段,并且在 Neo4j 4.4 版之后将不再受支持。 可以在此处找到 Kafka Connect Neo4j 连接器的最新版本。 |
配置系统概述
配置信息的位置
对于 4.0.7
之前的版本,配置管理使用 neo4j.conf
文件中提供的属性静态工作;从 4.0.7
版本开始,我们引入了一个基于动态重新加载的新配置系统,该系统依赖于 $NEO4J_HOME/conf
中的 streams.conf
文件。
基于过程更改
从 4.0.7
版本开始,您将找到三个新的过程
-
streams.configuration.get
返回当前配置 -
streams.configuration.set({<plugin_config_map>}, {<procedure_config>})
应用新配置并返回它 -
streams.configuration.remove({<plugin_config_keys_list>}, {<procedure_config>})
删除提供的配置键并返回新的配置状态
注意 这些过程适用于每个数据库实例,这意味着如果您正在更改影响所有数据库的属性,则正在运行 Streams 模块的所有数据库都将收到通知并重新启动。
streams.configuration.get
此过程返回应用于 接收器
和 源
插件的当前配置。
输出参数
变量名称 | 描述 |
---|---|
|
配置名称 |
|
配置值 |
因此,给定以下过程调用
CALL streams.configuration.get() YIELD name, value
RETURN name, value
您将获得以下输出(它与您的配置相关😄)
name | value |
---|---|
"streams.sink.topic.cypher.test" |
"CREATE (p:Person{name: event.name, surname: event.surname})" |
"streams.sink.errors.tolerance" |
"all" |
"kafka.default.api.timeout.ms" |
"5000" |
"kafka.bootstrap.servers" |
"broker:9093" |
"streams.sink.errors.log.include.messages" |
"true" |
"streams.sink.enabled" |
"true" |
"streams.sink.errors.log.enable" |
"true" |
streams.configuration.set
此过程应用作为第一个参数传递的配置参数映射。
输入参数
变量名称 | 描述 |
---|---|
|
此映射表示应用于 |
|
配置映射 |
输出参数
变量名称 | 描述 |
---|---|
|
配置名称 |
|
配置值 |
遵循 procedure_config
的已接受参数
配置名称 | 描述 |
---|---|
|
(布尔值,默认为 |
因此,给定以下过程调用
CALL streams.configuration.set({`streams.sink.topic.cypher.test`: 'CREATE (p:Person{name: event.name, surname: event.surname, fullName: event.name + ' ' + event.surname})'}, {save: false}) YIELD name, value
RETURN name, value
您将获得以下输出(它与您的配置相关😄)
name | value |
---|---|
"streams.sink.topic.cypher.test" |
"CREATE (p:Person{name: event.name, surname: event.surname})" |
"streams.sink.errors.tolerance" |
"all" |
"kafka.default.api.timeout.ms" |
"5000" |
"kafka.bootstrap.servers" |
"broker:9093" |
"streams.sink.errors.log.include.messages" |
"true" |
"streams.sink.enabled" |
"true" |
"streams.sink.errors.log.enable" |
"true" |
streams.configuration.remove
此过程从配置中删除提供的键列表。
输入参数
变量名称 | 描述 |
---|---|
|
此列表表示将从配置中删除的属性集。 |
|
配置映射 |
输出参数
变量名称 | 描述 |
---|---|
|
配置名称 |
|
配置值 |
遵循 procedure_config
的已接受参数
配置名称 | 描述 |
---|---|
|
(布尔值,默认为 |
因此,给定以下过程调用
CALL streams.configuration.remove([`kafka.acks`], {save: false}) YIELD name, value
RETURN name, value
您将获得以下输出(它与您的配置相关😄)
name | value |
---|---|
"streams.sink.topic.cypher.test" |
"CREATE (p:Person{name: event.name, surname: event.surname})" |
"streams.sink.errors.tolerance" |
"all" |
"kafka.default.api.timeout.ms" |
"5000" |
"kafka.bootstrap.servers" |
"broker:9093" |
"streams.sink.errors.log.include.messages" |
"true" |
"streams.sink.enabled" |
"true" |
"streams.sink.errors.log.enable" |
"true" |
从过程更改配置属性时会发生什么
当我们从 streams.configuration.set/remove
更改配置属性时,在幕后 接收器
和 源
模块将重新加载。因此,请谨慎使用,因为它会影响您的流流程。
注意 仅当与自身相关的配置发生更改时,源/接收器模块才会重新启动;这意味着如果您同时激活了两个模块,并且您更改了与接收器相关的属性,则只有它会被重新启动。
源模块中会发生什么
在重新加载过程中,事务事件处理程序将被断开连接,这意味着即使在重新加载期间发生的任何事务也不会被源
捕获,因此它们会丢失。
接收器模块中会发生什么
在重新加载过程中,接收器将停止,这不会对您的摄取过程产生任何影响,因为它将从上次提交的消息开始重新启动,因此不会丢失任何数据。