源配置设置

连接器设置

名称 描述

connector.class 必填

org.neo4j.connectors.kafka.source.Neo4jConnector

key.converter 必填

兼容的 Kafka Connect 转换器。

可以是以下之一;

  • io.confluent.connect.avro.AvroConverter

  • io.confluent.connect.json.JsonSchemaConverter

  • io.confluent.connect.protobuf.ProtobufConverter

key.converter.schema.registry.url

消息密钥的模式注册表 URL。当配置的 key.converter 需要模式注册表时需要。

key.converter.optional.for.nullables

key.converterio.confluent.connect.protobuf.ProtobufConverter 时,应设置为 true

value.converter 必填

兼容的 Kafka Connect 转换器。

可以是以下之一;

  • io.confluent.connect.avro.AvroConverter

  • io.confluent.connect.json.JsonSchemaConverter

  • io.confluent.connect.protobuf.ProtobufConverter

value.converter.schema.registry.url

消息值的模式注册表 URL。当配置的 value.converter 需要模式注册表时需要。

value.converter.optional.for.nullables

key.converterio.confluent.connect.protobuf.ProtobufConverter 时,应设置为 true

Neo4j 连接设置

名称 描述

neo4j.uri 必填

要连接的 Neo4j URI。可以指定多个 URI,用 , 分隔。

neo4j.database

要连接的 Neo4j 数据库名称。建议明确指定。

neo4j.authentication.type 必填

要使用的身份验证类型。NONEBASICKERBEROSBEARERCUSTOM 之一

默认:BASIC

neo4j.authentication.basic.username

用于身份验证的用户名。当 neo4j.authentication.typeBASIC 时需要。

neo4j.authentication.basic.password

用于身份验证的密码。当 neo4j.authentication.typeBASIC 时需要。

neo4j.authentication.basic.realm

用于身份验证的身份验证区域,留空以使用默认区域。

neo4j.authentication.kerberos.ticket

用于建立连接的 Kerberos 票证。当 neo4j.authentication.typeKERBEROS 时需要。

neo4j.authentication.bearer.token

用于建立连接的承载令牌。当 neo4j.authentication.typeBEARER 时需要。

neo4j.authentication.custom.scheme

用于建立连接的自定义身份验证方案。当 neo4j.authentication.typeCUSTOM 时需要。

neo4j.authentication.custom.principal

用于建立连接的自定义主体。当 neo4j.authentication.typeCUSTOM 时需要。

neo4j.authentication.custom.credentials

用于建立连接的自定义凭据。当 neo4j.authentication.typeCUSTOM 时需要。

neo4j.authentication.custom.realm

用于身份验证的自定义身份验证区域,根据您的自定义身份验证提供程序设置。

neo4j.connection-timeout

TCP 连接超时(有效单位:mssmhd;默认单位为 s)。

默认:30s(驱动程序默认值)

neo4j.pool.max-connection-pool-size

要保存在连接池中的最大连接数。

默认:100(驱动程序默认值)

neo4j.pool.connection-acquisition-timeout

从连接池获取连接的最大等待时间(有效单位:mssmhd;默认单位为 s)。

默认:60s(驱动程序默认值)

neo4j.pool.idle-time-before-connection-test

空闲连接在测试其活动性之前的时间段(有效单位:mssmhd;默认单位为 s)。

默认:no test(驱动程序默认值)

neo4j.pool.max-connection-lifetime

连接从连接池中删除的时间段(有效单位:`mssmhd;默认单位为 s)。

默认:1h(驱动程序默认值)

neo4j.max-retry-time

重试事务的最大持续时间。

默认:30s

neo4j.security.encrypted

是否启用加密。仅在 neo4j.uri 中使用 boltneo4j 方案时适用。truefalse 之一

默认:false

neo4j.security.trust-strategy

用于 TLS 连接的信任策略。TRUST_ALL_CERTIFICATESTRUST_SYSTEM_CA_SIGNED_CERTIFICATESTRUST_CUSTOM_CA_SIGNED_CERTIFICATES 之一 当 neo4j.security.encryptedtrue 时需要。

默认:TRUST_SYSTEM_CA_SIGNED_CERTIFICATES

neo4j.security.cert-files

包含要信任的 CA 的 X509 证书的文件列表。当 neo4j.security.trust-strategyTRUST_CUSTOM_CA_SIGNED_CERTIFICATES 时需要。

neo4j.security.hostname-verification-enabled

TLS 握手期间是否启用主机名验证。truefalse 之一

默认:true

通用源设置

名称 描述

neo4j.source-strategy 必填

此连接器的源策略。CDCQUERY 之一。

neo4j.start-from 必填

开始流式传输的时间锚点。EARLIESTNOWUSER_PROVIDED 之一。仅在连接器实例的初始运行时使用,并且在 Kafka Connect 中已存在存储的偏移量时忽略。

默认:NOW

neo4j.start-from.value

用作起始偏移量的自定义值。当 neo4j.start-from 设置为 USER_PROVIDED 时需要。

neo4j.ignore-stored-offset

是否忽略从上次运行保存的偏移量存储中检索到的任何偏移量值。truefalse 之一。

默认:false

neo4j.batch-size

每个轮询周期发布的最大更改事件数。

默认:1000

neo4j.payload-mode

定义更改消息的结构。COMPACTEXTENDED 之一。COMPACT 提供更简单的消息,但如果属性类型发生更改,则会遇到模式兼容性问题。EXTENDED 包括类型信息以避免此类问题。

默认:EXTENDED

CDC 策略设置

名称 描述

neo4j.cdc.topic.{NAME}.key-strategy

CDC 主题密钥的序列化策略。SKIPELEMENT_IDENTITY_KEYSWHOLE_VALUE 之一。

默认:WHOLE_WALUE

neo4j.cdc.topic.{NAME}.value-strategy

CDC 主题密钥的序列化策略。CHANGE_EVENTENTITY_EVENT 之一。

默认:CHANGE_EVENT

注意:如果您使用的是具有 CDC 策略之一的接收器连接器,则此设置必须配置为 CHANGE_EVENT

neo4j.cdc.poll-duration

Kafka Connect 轮询请求等待从数据库接收更改的最大时间量(有效单位:mssmhd;默认单位为 s

默认:5s

neo4j.cdc.poll-interval

neo4j.cdc.poll-duration 期间数据库轮询更改的间隔(有效单位:mssmhd;默认单位为 s)。

默认:1s

neo4j.cdc.topic.{NAME}.patterns

用逗号分隔的图模式列表,以及可选的事件属性过滤器(以 +- 为前缀,分别表示包含或排除)。

示例设置:"neo4j.cdc.topic.my-topic.patterns": "(:Person {+name}),(:Person)-[:WORKS_FOR]→(:Company),(:Company {-id})"Company 节点的更改事件不会包含 id 属性。

neo4j.cdc.topic.{NAME}.patterns.{INDEX}.pattern

索引的图模式,以及可选的事件属性过滤器(以 +- 为前缀,分别表示包含或排除)。

示例:"neo4j.cdc.topic.my-topic.patterns.0.pattern": "(:Company {+name})"。针对Company节点的变更事件将始终包含name属性(在 CDC FULL 丰富模式下),或者当name属性发生变化时(在 CDC DIFF 丰富模式下)。

neo4j.cdc.topic.{NAME}.patterns.{INDEX}.operation

CREATEUPDATEDELETE 之一。针对相应索引图模式,我们希望接收其变更事件的操作。

示例:"neo4j.cdc.topic.my-topic.patterns.0.operation": "DELETE"

neo4j.cdc.topic.{NAME}.patterns.{INDEX}.changesTo

用逗号分隔的属性名称列表。

示例:"neo4j.cdc.topic.my-topic.patterns.0.changesTo": "name, age"(参见 选择器如何组合

neo4j.cdc.topic.{NAME}.patterns.{INDEX}.metadata.authenticatedUser

执行变更时已验证身份的用户。

示例:"neo4j.cdc.topic.my-topic.patterns.0.metadata.authenticatedUser": "userA"

neo4j.cdc.topic.{NAME}.patterns.{INDEX}.metadata.executingUser

执行变更时执行事务的用户。通常与authenticatedUser相同,但如果使用用户模拟,则可能发生变化。

示例:"neo4j.cdc.topic.my-topic.patterns.0.metadata.executingUser": "userB"

neo4j.cdc.topic.{NAME}.patterns.{INDEX}.metadata.txMetadata.{KEY}

键值事务元数据选择器。键值对与实际事务元数据匹配(键去掉了上述前缀)。

示例:"neo4j.cdc.topic.my-topic.patterns.0.metadata.txMetadata.app": "neo4j-browser"

QUERY 策略设置

名称 描述

neo4j.query.topic 必填

发布通过提供的查询收集的变更事件的 Kafka 主题。

neo4j.query 必填

用于收集变更的 Cypher 查询。需要结果集中同时存在neo4j.query.streaming-property 和用于跟踪变更的$lastCheck 查询参数。

neo4j.query.streaming-property

属性名称,该名称既存在于指定查询的结果集中,也用作从先前值查询变更的过滤器。

默认:timestamp

neo4j.query.poll-duration

Kafka Connect 轮询请求等待从数据库接收变更的最大时间量(有效单位:mssmhd;默认单位为 s)。

默认:5s

neo4j.query.poll-interval

neo4j.query.poll-duration期间数据库轮询变更的间隔(有效单位:mssmhd;默认单位为 s)。

默认:1s

neo4j.query.timeout

查询允许运行的最大时间量(有效单位:mssmhd;默认单位为 s)。

默认:0s