Neo4j 流 - 过程

Kafka Connect Neo4j 连接器是将 Kafka 与 Neo4j 集成的推荐方法,因为 Neo4j 流不再处于积极开发中,并且在 Neo4j 4.4 版本之后将不再受支持。

可以在 此处找到最新版本的 Kafka Connect Neo4j 连接器。

Streams 项目提供了一系列过程。

配置

您可以通过更改 neo4j.conf 中的此变量来启用/禁用过程

neo4j.conf
streams.procedures.enabled=<true/false, default=true>

请注意,默认情况下,dbms.security.procedures.whitelist 属性被禁用,因此 Neo4j 将加载找到的所有过程。如果您启用它,那么您还必须声明一个逗号分隔的要默认加载的过程列表。例如

dbms.security.procedures.whitelist=streams.*,apoc.*

如果您尝试在未将其声明到白名单的情况下调用 Streams 过程之一,您将收到类似于以下内容的错误

procedure not found
图 1. Neo4j 流过程未找到

多数据库支持

Neo4j 4.0 Enterprise 具有 多租户支持,为了支持此功能,您可以为每个数据库实例设置一个带有以下模式 <DB_NAME> 的配置后缀,以应用于 neo4j.conf 文件中的属性。

因此,要启用 Streams 过程,应添加以下属性

neo4j.conf
streams.procedures.enabled.<DB_NAME>=<true/false, default=true>

因此,如果您有一个实例名称 foo,您可以通过以下方式指定配置

neo4j.conf
streams.procedures.enabled.foo=<true/false, default=true>

旧属性

neo4j.conf
streams.procedures.enabled=<true/false, default=true>

仍然有效,它指的是 Neo4j 的默认数据库实例。

特别是,以下属性将用作非默认数据库实例的默认值,如果未提供特定配置参数,则使用此值。

streams.procedures.enabled=<true/false, default=true>

streams.publish

此过程允许使用底层配置的生产者从 Neo4j 自定义消息流到配置的环境。

用途

CALL streams.publish('my-topic', 'Hello World from Neo4j!')

从消费者检索到的消息如下所示

{"payload":"Hello world from Neo4j!"}

如果您使用本地 docker (compose) 设置,您可以使用以下命令检查这些消息

docker exec -it kafka kafka-console-consumer --topic my-topic --bootstrap-server kafka:9092

输入参数

变量名称 类型 描述

topic

字符串

要发布数据的主题

payload

对象

要流式传输的数据

配置参数

名称 类型 描述

key

对象

要流式传输的消息的键值。请注意,如果键不存在,您将收到一个消息,该消息具有一个随机 UUID 作为键值

partition

整数

要流式传输的消息的分区

您可以在有效负载中发送任何类型的数据,包括节点、关系、路径、列表、映射、标量值和嵌套版本。

对于节点或关系,如果主题在配置提供的模式中定义,它们的属性将根据配置进行过滤。

streams.publish.sync

类似于 streams.publish 过程,但以同步方式执行。

用途

CALL streams.publish.sync('my-topic', 'my-payload', {<config>}) YIELD value RETURN value

此过程返回一个 RecordMetadata 值,例如 {"timestamp": 1, "offset": 2, "partition", 3, "keySize", 4, "valueSize", 5}

变量名称 描述

timestamp

主题/分区中记录的时间戳。

offset

主题/分区中记录的偏移量。

partition

记录发送到的分区

keySize

序列化后未压缩键的大小(以字节为单位)

valueSize

序列化后未压缩值的大小(以字节为单位)

streams.consume

此过程允许从给定主题消费消息。

用途

CALL streams.consume('my-topic', {<config>}) YIELD event RETURN event

示例:假设您有一个生产者发布类似 {"name": "Andrea", "surname": "Santurbano"} 的事件,我们可以通过以下方式创建用户节点

CALL streams.consume('my-topic') YIELD event
CREATE (p:Person{firstName: event.data.name, lastName: event.data.surname})

如果您想读取主题分区的特定偏移量,您可以通过执行以下查询来实现

CALL streams.consume('my-topic', {timeout: 5000, partitions: [{partition: 0, offset: 30}]}) YIELD event
CREATE (p:Person{firstName: event.data.name, lastName: event.data.surname})

输入参数

变量名称 类型 描述

topic

字符串

要发布数据的主题

config

Map<K,V>

配置参数

可用的配置参数

变量名称 类型 描述

timeout

数字(默认值 1000

定义过程应监听主题的时间

from

字符串

它是 Kafka 配置参数 auto.offset.reset。如果未指定,它将继承底层 kafka.auto.offset.reset

groupId

字符串

它是 Kafka 配置参数 group.id。如果未指定,它将继承底层 kafka.group.id

autoCommit

布尔值(默认值 true

它是 Kafka 配置参数 enable.auto.commit。如果未指定,它将继承底层 kafka.enable.auto.commit

commit

布尔值(默认值 true

如果 autoCommit 设置为 false,您可以决定是否要提交数据。

broker

字符串

Kafka 节点 url 的逗号分隔字符串。如果未指定,它将继承底层 kafka.bootstrap.servers

partitions

List<Map<K,V>>

该映射包含有关分区和偏移量的信息,以便从

keyDeserializer

字符串

Kafka 记录键支持的解串器如果未指定,它将继承底层 kafka.key.deserializer 值。支持的解串器有:org.apache.kafka.common.serialization.ByteArrayDeserializerio.confluent.kafka.serializers.KafkaAvroDeserializer

valueDeserializer

字符串

Kafka 记录值的支持的解串器如果未指定,它将继承底层 kafka.value.deserializer 值支持的解串器有:org.apache.kafka.common.serialization.ByteArrayDeserializerio.confluent.kafka.serializers.KafkaAvroDeserializer

schemaRegistryUrl

字符串

模式注册表 url,如果您处理 AVRO 消息,则需要此 url。

分区

变量名称 类型 描述

partition

数字

它是 Kafka 分区号,用于读取

offset

数字

它是从主题分区开始读取的偏移量

Streams 接收器生命周期过程

我们提供了一组过程来管理接收器生命周期。

过程名称 描述

CALL streams.sink.stop() YIELD name, value

停止 Sink 并返回状态,如果过程中出现错误,则返回错误信息。

CALL streams.sink.start() YIELD name, value

启动 Sink 并返回状态,如果过程中出现错误,则返回错误信息。

CALL streams.sink.restart() YIELD name, value

重启 Sink 并返回状态,如果过程中出现错误,则返回错误信息。

CALL streams.sink.config() YIELD name, value

返回 Sink 配置,请查看“Streams 配置”表格。

CALL streams.sink.status() YIELD name, value

返回状态。

请注意,要使用这些过程,您必须启用 Streams 过程,并且它们只能在领导者节点上运行。
表 1. Streams 配置
配置名称 描述

invalid_topics

返回无效主题列表。

streams.sink.topic.pattern.relationship

返回一个 Map<K,V>,其中 K 是主题名称,V 是提供的模式。

streams.sink.topic.cud

返回为 CUD 格式定义的主题列表。

streams.sink.topic.cdc.sourceId

返回为 CDC SourceId 策略定义的主题列表。

streams.sink.topic.cypher

返回一个 Map<K,V>,其中 K 是主题名称,V 是提供的 Cypher 查询。

streams.sink.topic.cdc.schema

返回为 CDC Schema 策略定义的主题列表。

streams.sink.topic.pattern.node

返回一个 Map<K,V>,其中 K 是主题名称,V 是提供的模式。

streams.sink.errors

返回一个 Map<K,V>,其中 K 是子属性名称,V 是值。

streams.sink.source.id.strategy.config

返回 SourceId CDC 策略的配置。

示例

Executing: CALL streams.sink.config()
+----------------------------------------------------------------------------------------------------------------------------------------------+
| name                                      | value                                                                                            |
+----------------------------------------------------------------------------------------------------------------------------------------------+
| "streams.sink.errors"                     | {}                                                                                               |
| "streams.sink.source.id.strategy.config"  | {idName -> "sourceId", labelName -> "SourceEvent"}                                               |
| "streams.sink.topic.cypher"               | {shouldWriteCypherQuery -> "MERGE (n:Label {id: event.id}) ON CREATE SET n += event.properties"} |
| "streams.sink.topic.cud"                  | []                                                                                               |
| "streams.sink.topic.cdc.schema"           | []                                                                                               |
| "streams.sink.topic.cdc.sourceId"         | []                                                                                               |
| "streams.sink.topic.pattern.node"         | {}                                                                                               |
| "streams.sink.topic.pattern.relationship" | {}                                                                                               |
| "invalid_topics"                          | []                                                                                               |
+----------------------------------------------------------------------------------------------------------------------------------------------+
9 rows
Executing: CALL streams.sink.stop()
+----------------------+
| name     | value     |
+----------------------+
| "status" | "STOPPED" |
+----------------------+
1 row
Executing: CALL streams.sink.status()
+----------------------+
| name     | value     |
+----------------------+
| "status" | "STOPPED" |
+----------------------+
1 row
Executing: CALL streams.sink.start()
+----------------------+
| name     | value     |
+----------------------+
| "status" | "RUNNING" |
+----------------------+
1 row
Executing: CALL streams.sink.status()
+----------------------+
| name     | value     |
+----------------------+
| "status" | "RUNNING" |
+----------------------+
1 row
Executing: CALL streams.sink.restart()
+----------------------+
| name     | value     |
+----------------------+
| "status" | "RUNNING" |
+----------------------+
1 row
Given a cluster env, executing in a NON LEADER: CALL streams.sink.status()
+--------------------------------------------------------------------------------------------------+
| name    | value                                                                                  |
+--------------------------------------------------------------------------------------------------+
| "error" | "You can use this procedure only in the LEADER or in a single instance configuration." |
+--------------------------------------------------------------------------------------------------+
1 row