Neo4j 流 - 过程
Kafka Connect Neo4j 连接器是将 Kafka 与 Neo4j 集成的推荐方法,因为 Neo4j 流不再处于积极开发中,并且在 Neo4j 4.4 版本之后将不再受支持。 可以在 此处找到最新版本的 Kafka Connect Neo4j 连接器。 |
Streams 项目提供了一系列过程。
配置
您可以通过更改 neo4j.conf
中的此变量来启用/禁用过程
streams.procedures.enabled=<true/false, default=true>
请注意,默认情况下,
如果您尝试在未将其声明到白名单的情况下调用 Streams 过程之一,您将收到类似于以下内容的错误 ![]() 图 1. Neo4j 流过程未找到
|
多数据库支持
Neo4j 4.0 Enterprise 具有 多租户支持,为了支持此功能,您可以为每个数据库实例设置一个带有以下模式 <DB_NAME>
的配置后缀,以应用于 neo4j.conf 文件中的属性。
因此,要启用 Streams 过程,应添加以下属性
streams.procedures.enabled.<DB_NAME>=<true/false, default=true>
因此,如果您有一个实例名称 foo
,您可以通过以下方式指定配置
streams.procedures.enabled.foo=<true/false, default=true>
旧属性
streams.procedures.enabled=<true/false, default=true>
仍然有效,它指的是 Neo4j 的默认数据库实例。
特别是,以下属性将用作非默认数据库实例的默认值,如果未提供特定配置参数,则使用此值。
streams.procedures.enabled=<true/false, default=true>
streams.publish
此过程允许使用底层配置的生产者从 Neo4j 自定义消息流到配置的环境。
用途
CALL streams.publish('my-topic', 'Hello World from Neo4j!')
从消费者检索到的消息如下所示
{"payload":"Hello world from Neo4j!"}
如果您使用本地 docker (compose) 设置,您可以使用以下命令检查这些消息
docker exec -it kafka kafka-console-consumer --topic my-topic --bootstrap-server kafka:9092
输入参数
变量名称 | 类型 | 描述 |
---|---|---|
|
字符串 |
要发布数据的主题 |
|
对象 |
要流式传输的数据 |
配置参数
名称 | 类型 | 描述 |
---|---|---|
|
对象 |
要流式传输的消息的键值。请注意,如果键不存在,您将收到一个消息,该消息具有一个随机 UUID 作为键值 |
|
整数 |
要流式传输的消息的分区 |
您可以在有效负载中发送任何类型的数据,包括节点、关系、路径、列表、映射、标量值和嵌套版本。
对于节点或关系,如果主题在配置提供的模式中定义,它们的属性将根据配置进行过滤。
streams.publish.sync
类似于 streams.publish
过程,但以同步方式执行。
用途
CALL streams.publish.sync('my-topic', 'my-payload', {<config>}) YIELD value RETURN value
此过程返回一个 RecordMetadata
值,例如 {"timestamp": 1, "offset": 2, "partition", 3, "keySize", 4, "valueSize", 5}
变量名称 | 描述 |
---|---|
|
主题/分区中记录的时间戳。 |
|
主题/分区中记录的偏移量。 |
|
记录发送到的分区 |
|
序列化后未压缩键的大小(以字节为单位) |
|
序列化后未压缩值的大小(以字节为单位) |
streams.consume
此过程允许从给定主题消费消息。
用途
CALL streams.consume('my-topic', {<config>}) YIELD event RETURN event
示例:假设您有一个生产者发布类似 {"name": "Andrea", "surname": "Santurbano"}
的事件,我们可以通过以下方式创建用户节点
CALL streams.consume('my-topic') YIELD event
CREATE (p:Person{firstName: event.data.name, lastName: event.data.surname})
如果您想读取主题分区的特定偏移量,您可以通过执行以下查询来实现
CALL streams.consume('my-topic', {timeout: 5000, partitions: [{partition: 0, offset: 30}]}) YIELD event
CREATE (p:Person{firstName: event.data.name, lastName: event.data.surname})
输入参数
变量名称 | 类型 | 描述 |
---|---|---|
|
字符串 |
要发布数据的主题 |
|
Map<K,V> |
配置参数 |
可用的配置参数
变量名称 | 类型 | 描述 |
---|---|---|
|
数字(默认值 |
定义过程应监听主题的时间 |
|
字符串 |
它是 Kafka 配置参数 |
|
字符串 |
它是 Kafka 配置参数 |
|
布尔值(默认值 |
它是 Kafka 配置参数 |
|
布尔值(默认值 |
如果 |
|
字符串 |
Kafka 节点 url 的逗号分隔字符串。如果未指定,它将继承底层 |
|
List<Map<K,V>> |
该映射包含有关分区和偏移量的信息,以便从 |
|
字符串 |
Kafka 记录键支持的解串器如果未指定,它将继承底层 |
|
字符串 |
Kafka 记录值的支持的解串器如果未指定,它将继承底层 |
|
字符串 |
模式注册表 url,如果您处理 AVRO 消息,则需要此 url。 |
Streams 接收器生命周期过程
我们提供了一组过程来管理接收器生命周期。
过程名称 | 描述 |
---|---|
|
停止 Sink 并返回状态,如果过程中出现错误,则返回错误信息。 |
|
启动 Sink 并返回状态,如果过程中出现错误,则返回错误信息。 |
|
重启 Sink 并返回状态,如果过程中出现错误,则返回错误信息。 |
|
返回 Sink 配置,请查看“Streams 配置”表格。 |
|
返回状态。 |
请注意,要使用这些过程,您必须启用 Streams 过程,并且它们只能在领导者节点上运行。 |
配置名称 | 描述 |
---|---|
invalid_topics |
返回无效主题列表。 |
streams.sink.topic.pattern.relationship |
返回一个 Map<K,V>,其中 K 是主题名称,V 是提供的模式。 |
streams.sink.topic.cud |
返回为 CUD 格式定义的主题列表。 |
streams.sink.topic.cdc.sourceId |
返回为 CDC SourceId 策略定义的主题列表。 |
streams.sink.topic.cypher |
返回一个 Map<K,V>,其中 K 是主题名称,V 是提供的 Cypher 查询。 |
streams.sink.topic.cdc.schema |
返回为 CDC Schema 策略定义的主题列表。 |
streams.sink.topic.pattern.node |
返回一个 Map<K,V>,其中 K 是主题名称,V 是提供的模式。 |
streams.sink.errors |
返回一个 Map<K,V>,其中 K 是子属性名称,V 是值。 |
streams.sink.source.id.strategy.config |
返回 SourceId CDC 策略的配置。 |
示例
Executing: CALL streams.sink.config()
+----------------------------------------------------------------------------------------------------------------------------------------------+
| name | value |
+----------------------------------------------------------------------------------------------------------------------------------------------+
| "streams.sink.errors" | {} |
| "streams.sink.source.id.strategy.config" | {idName -> "sourceId", labelName -> "SourceEvent"} |
| "streams.sink.topic.cypher" | {shouldWriteCypherQuery -> "MERGE (n:Label {id: event.id}) ON CREATE SET n += event.properties"} |
| "streams.sink.topic.cud" | [] |
| "streams.sink.topic.cdc.schema" | [] |
| "streams.sink.topic.cdc.sourceId" | [] |
| "streams.sink.topic.pattern.node" | {} |
| "streams.sink.topic.pattern.relationship" | {} |
| "invalid_topics" | [] |
+----------------------------------------------------------------------------------------------------------------------------------------------+
9 rows
Executing: CALL streams.sink.stop()
+----------------------+
| name | value |
+----------------------+
| "status" | "STOPPED" |
+----------------------+
1 row
Executing: CALL streams.sink.status()
+----------------------+
| name | value |
+----------------------+
| "status" | "STOPPED" |
+----------------------+
1 row
Executing: CALL streams.sink.start()
+----------------------+
| name | value |
+----------------------+
| "status" | "RUNNING" |
+----------------------+
1 row
Executing: CALL streams.sink.status()
+----------------------+
| name | value |
+----------------------+
| "status" | "RUNNING" |
+----------------------+
1 row
Executing: CALL streams.sink.restart()
+----------------------+
| name | value |
+----------------------+
| "status" | "RUNNING" |
+----------------------+
1 row
Given a cluster env, executing in a NON LEADER: CALL streams.sink.status()
+--------------------------------------------------------------------------------------------------+
| name | value |
+--------------------------------------------------------------------------------------------------+
| "error" | "You can use this procedure only in the LEADER or in a single instance configuration." |
+--------------------------------------------------------------------------------------------------+
1 row