Neo4j Streams - Sink:Kafka → Neo4j
Kafka Connect Neo4j 连接器是将 Kafka 与 Neo4j 集成的推荐方法,因为 Neo4j Streams 不再处于积极开发状态,并且在 Neo4j 4.4 版之后将不再受支持。 Kafka Connect Neo4j 连接器的最新版本可以在这里找到 这里。 |
是 Kafka Sink,它将数据直接导入 Neo4j
工作原理
它通过多种方式工作
-
通过提供 Cypher 模板
-
通过通过更改数据捕获模块获取来自另一个 Neo4j 实例发出的事件
-
通过向 JSON 或 AVRO 文件提供模式提取
-
通过管理 CUD 文件格式
Cypher 模板策略是唯一保证消息按到达主题的顺序处理的 Sink 策略。 其他 Sink 策略按操作类型对消息进行分组,这也可以优化为批处理。在这种情况下,执行顺序如下
|
Cypher 模板
它使用存储在以下格式的属性中的模板 Cypher 查询
streams.sink.topic.cypher.<TOPIC_NAME>=<CYPHER_QUERY>
每个 Cypher 模板都必须引用一个将由 Sink 注入的 事件 对象
以下是一个示例
对于此事件
{
"id": 42,
"properties": {
"title": "Answer to anyting",
"description": "It depends."
}
}
streams.sink.topic.cypher.my-topic=MERGE (n:Label {id: event.id}) \
ON CREATE SET n += event.properties
在后台,Sink 以这种方式将事件对象作为参数注入
UNWIND {events} AS event
MERGE (n:Label {id: event.id})
ON CREATE SET n += event.properties
其中 {events}
是一个 json 列表,因此继续以上面的示例,可能的完整表示可能如下
:params events => [{id:"alice@example.com",properties:{name:"Alice",age:32}},
{id:"bob@example.com",properties:{name:"Bob",age:42}}]
UNWIND {events} AS event
MERGE (n:Label {id: event.id})
ON CREATE SET n += event.properties
当您决定使用 Cypher 模板作为 Sink 策略将数据从 Kafka 导入 Neo4j 时,您必须确保查询的正确性。如果查询未优化,这也会导致可能的性能问题或插件似乎卡住的情况,例如,如果查询将大量节点和关系加载到内存中。我们建议以下操作
|
更改数据捕获事件
此方法允许获取来自另一个 Neo4j 实例的 CDC 事件。您可以使用两种策略
-
SourceId
策略,它通过 CDC 事件id
字段(与 Neo4j 物理 ID 相关)合并节点/关系 -
Schema
策略,它通过图模型中定义的约束(唯一性、节点键)合并节点/关系
SourceId
策略
您可以按以下方式配置主题
streams.sink.topic.cdc.sourceId=<list of topics separated by semicolon>
streams.sink.topic.cdc.sourceId.labelName=<the label attached to the node, default=SourceEvent>
streams.sink.topic.cdc.sourceId.idName=<the id name given to the CDC id field, default=sourceId>
streams.sink.topic.cdc.sourceId=my-topic;my-other.topic
每个 streams 事件都将投影到相关图实体中,例如以下事件
{
"meta": {
"timestamp": 1532597182604,
"username": "neo4j",
"tx_id": 3,
"tx_event_id": 0,
"tx_events_count": 2,
"operation": "created",
"source": {
"hostname": "neo4j.mycompany.com"
}
},
"payload": {
"id": "1004",
"type": "node",
"after": {
"labels": ["Person"],
"properties": {
"email": "annek@noanswer.org",
"last_name": "Kretchmar",
"first_name": "Anne Marie"
}
}
},
"schema": {
"properties": {
"last_name": "String",
"email": "String",
"first_name": "String"
},
"constraints": [{
"label": "Person",
"properties": ["first_name", "last_name"],
"type": "UNIQUE"
}]
}
}
将持久化为以下节点
Person:SourceEvent{first_name: "Anne Marie", last_name: "Kretchmar", email: "annek@noanswer.org", sourceId: "1004"}
正如您所注意到的,摄取的事件已使用两种特性进行了投影
-
id
字段已转换为sourceId
; -
节点具有附加标签
SourceEvent
;
这两个字段将用于匹配节点/关系以进行将来的更新/删除
Schema
策略
您可以按以下方式配置主题
streams.sink.topic.cdc.schema=<LIST_OF_TOPICS_SEPARATED_BY_SEMICOLON>
streams.sink.topic.cdc.schema=my-topic;my-other.topic
每个 streams 事件都将投影到相关图实体中,例如以下事件
{
"meta": {
"timestamp": 1532597182604,
"username": "neo4j",
"tx_id": 3,
"tx_event_id": 0,
"tx_events_count": 2,
"operation": "created",
"source": {
"hostname": "neo4j.mycompany.com"
}
},
"payload": {
"id": "1004",
"type": "node",
"after": {
"labels": ["Person"],
"properties": {
"email": "annek@noanswer.org",
"last_name": "Kretchmar",
"first_name": "Anne Marie"
}
}
},
"schema": {
"properties": {
"last_name": "String",
"email": "String",
"first_name": "String"
},
"constraints": [{
"label": "Person",
"properties": ["first_name", "last_name"],
"type": "UNIQUE"
}]
}
}
将持久化为以下节点
Person{first_name: "Anne Marie", last_name: "Kretchmar", email: "annek@noanswer.org"}
Schema
策略利用 schema
字段插入/更新节点,因此不会创建额外的字段。
在关系的情况下
{
"meta": {
"timestamp": 1532597182604,
"username": "neo4j",
"tx_id": 3,
"tx_event_id": 0,
"tx_events_count": 2,
"operation": "created",
"source": {
"hostname": "neo4j.mycompany.com"
}
},
"payload": {
"id": "123",
"type": "relationship",
"label": "KNOWS",
"start": {
"labels": ["Person"],
"id": "123",
"ids": {
"last_name": "Andrea",
"first_name": "Santurbano"
}
},
"end": {
"labels": ["Person"],
"id": "456",
"ids": {
"last_name": "Michael",
"first_name": "Hunger"
}
},
"after": {
"properties": {
"since": "2018-04-05T12:34:00[Europe/Berlin]"
}
}
},
"schema": {
"properties": {
"since": "ZonedDateTime"
},
"constraints": [{
"label": "KNOWS",
"properties": ["since"],
"type": "RELATIONSHIP_PROPERTY_EXISTS"
}]
}
}
Schema
策略利用 ids
字段插入/更新关系,因此不会创建额外的字段。
Pattern
策略
Pattern
策略允许您通过提供提取模式从 json 中提取节点和关系
每个属性都可以以以下方式为前缀
-
!
:标识 id(可以有多个属性),这是 必需的 -
-
:从提取中排除属性如果没有指定前缀,则表示将包含该属性
您不能混合包含和排除,因此您的模式必须包含所有排除或包含属性 |
标签可以通过 :
连接
模式策略支持 墓碑记录,为了利用它,您的事件应包含您要删除的记录作为键,以及 null
作为值。
目前,您无法连接多个模式(例如,如果您只使用一个主题并生成多个节点/关系类型)。因此,您必须为每种类型的节点/关系使用不同的主题,并为每个主题定义一个模式 |
节点模式
配置
您可以按以下方式配置节点模式提取
streams.sink.topic.pattern.node.<TOPIC_NAME>=<NODE_EXTRACTION_PATTERN>
例如,给定通过 user
主题发布的以下 json
{"userId": 1, "name": "Andrea", "surname": "Santurbano", "address": {"city": "Venice", "cap": "30100"}}
您可以通过提供以下配置将其转换为节点
通过指定一个更简单的模式
streams.sink.topic.pattern.node.user=User{!userId}
或者通过指定一个类似 Cypher 的节点模式
streams.sink.topic.pattern.node.user=(:User{!userId})
与 CDC 模式类似,您可以提供
模式 | 含义 |
---|---|
|
userId 将用作 ID 字段,并且 json 的所有属性都将附加到具有提供的标签( |
|
userId 将用作 ID 字段,并且 json 的 仅 surname 属性将附加到具有提供的标签( |
|
userId 将用作 ID 字段,并且 json 的 仅 surname 和 |
|
userId 将用作 ID 字段,并且 |
关系模式
配置
您可以按以下方式配置关系模式提取
streams.sink.topic.pattern.relationship.<TOPIC_NAME>=<RELATIONSHIP_EXTRACTION_PATTERN>
例如,给定通过 user
主题发布的以下 json
{"userId": 1, "productId": 100, "price": 10, "currency": "€", "shippingAddress": {"city": "Venice", cap: "30100"}}
您可以通过提供以下配置将其转换为路径,例如 (n)-[r]→(m)
通过指定一个更简单的模式
streams.sink.topic.pattern.relationship.user=User{!userId} BOUGHT{price, currency} Product{!productId}
或者通过指定一个类似 Cypher 的节点模式
streams.sink.topic.pattern.relationship.user=(:User{!userId})-[:BOUGHT{price, currency}]->(:Product{!productId})
在最后一种情况下,我们假设 User
是源节点,Product
是目标节点
与 CDC 模式类似,您可以提供
模式 | 含义 |
---|---|
|
这将根据提供的标识符合并获取/创建这两个节点以及它们之间的 |
|
这将根据提供的标识符合并获取/创建这两个节点以及它们之间的 |
|
这将根据提供的标识符合并获取/创建这两个节点以及它们之间的 |
|
这将根据提供的标识符合并获取/创建这两个节点以及它们之间的 |
将属性附加到节点
默认情况下,不会将任何属性附加到边缘节点,但您可以指定要附加到每个节点的属性。假设以下通过user
主题发布的json
{
"userId": 1,
"userName": "Andrea",
"userSurname": "Santurbano",
"productId": 100,
"productName": "My Awesome Product!",
"price": 10,
"currency": "€"
}
模式 | 含义 |
---|---|
|
这将合并两个节点以及它们之间的 |
CUD文件格式
CUD文件格式是JSON文件,表示图实体(节点/关系)以及如何以Create/Update/Delete操作的方式管理它们。
您可以按以下方式配置主题
streams.sink.topic.cud=<LIST_OF_TOPICS_SEPARATED_BY_SEMICOLON>
streams.sink.topic.cud=my-topic;my-other.topic
我们有两种格式
-
一种用于节点
我们提供了一个
MERGE
操作的示例{ "op": "merge", "properties": { "foo": "value", "key": 1 }, "ids": {"key": 1, "otherKey": "foo"}, "labels": ["Foo","Bar"], "type": "node", "detach": true }
它将转换为以下Cypher查询
UNWIND [..., {
"op": "merge",
"properties": {
"foo": "value",
"key": 1
},
"ids": {"key": 1, "otherKey": "foo"},
"labels": ["Foo","Bar"],
"type": "node",
"detach": true
}, ...] AS event
MERGE (n:Foo:Bar {key: event.ids.key, otherkey: event.ids.otherkey})
SET n += event.properties
让我们描述这些字段
字段 | 必填 | 描述 |
---|---|---|
op |
是 |
操作类型:create/merge/update/delete 注意:delete消息用于单个节点,它并非旨在成为从JSON构建Cypher查询的通用方法。 |
properties |
如果操作为 |
附加到节点的属性 |
ids |
如果操作为 |
如果操作为merge/update/delete,则此字段为必填,并包含将用于查找实体的节点的主键/唯一键。如果您使用 注意:如果您将 |
labels |
否 |
附加到节点的标签。 注意:Neo4j允许创建没有标签的节点,但从性能角度来看,这是一个不好的主意,不要提供它们。 |
type |
是 |
实体类型:node/relationship ⇒ 在本例中为node |
detach |
否 |
如果操作为delete,您可以指定是否执行“detach”删除,这意味着在删除节点时删除任何关联关系 注意:如果未提供值,则默认为true |
-
以及一种用于关系
我们提供了一个CREATE
操作的示例
{
"op": "create",
"properties": {
"foo": "rel-value",
"key": 1
},
"rel_type": "MY_REL",
"from": {
"ids": {"key": 1},
"labels": ["Foo","Bar"]
},
"to": {
"ids": {"otherKey":1},
"labels": ["FooBar"]
},
"type":"relationship"
}
它将转换为以下Cypher查询
UNWIND [..., {
"op": "create",
"properties": {
"foo": "rel-value",
"key": 1
},
"rel-type": "MY-REL",
"from": {
"ids": {"key": 1},
"labels": ["Foo","Bar"]
},
"to": {
"ids": {"otherKey":1},
"labels": ["FooBar"]
},
"type":"relationship"
}, ...] AS event
MATCH (from:Foo:Bar {key: event.from.ids.key})
MATCH (to:FooBar {otherKey: event.to.ids.otherKey})
CREATE (from)-[r:MY_REL]->(to)
SET r = event.properties
让我们描述这些字段
字段 | 必填 | 描述 |
---|---|---|
op |
是 |
操作类型:create/merge/update/delete |
properties |
否 |
附加到关系的属性 |
rel_type |
是 |
关系类型 |
from |
是,如果您在 |
包含有关关系源节点的信息。有关 |
to |
是,如果您在 |
包含有关关系目标节点的信息。有关 |
type |
是 |
实体类型:node/relationship ⇒ 在本例中为relationship |
以下是节点和关系的DELETE
操作的另一个示例。
-
对于节点,以下JSON
{
"op": "delete",
"properties": {},
"ids": {"key": 1, "otherKey": "foo"},
"labels": ["Foo","Bar"],
"type": "node",
"detach": false
}
将转换为以下Cypher查询
UNWIND [..., {
"op": "delete",
"properties": {},
"ids": {"key": 1, "otherKey": "foo"},
"labels": ["Foo","Bar"],
"type": "node",
"detach": false
}, ...] AS event
MATCH (n:Foo:Bar {key: event.ids.key, otherkey: event.ids.otherkey})
DELETE n
请注意,如果您设置"detach": true
,则转换将为
UNWIND [
...
] AS event
...
DETACH DELETE n
-
对于关系,以下JSON
{
"op": "create",
"properties": {},
"rel_type": "MY_REL",
"from": {
"ids": {"key": 1},
"labels": ["Foo","Bar"]
},
"to": {
"ids": {"otherKey":1},
"labels": ["FooBar"]
},
"type":"relationship"
}
将转换为以下Cypher查询
UNWIND [..., {
"op": "create",
"properties": {},
"rel_type": "MY_REL",
"from": {
"ids": {"key": 1},
"labels": ["Foo","Bar"]
},
"to": {
"ids": {"otherKey":1},
"labels": ["FooBar"]
},
"type":"relationship"
}, ...] AS event
MATCH (from:Foo:Bar {key: event.from.ids.key})
MATCH (to:FooBar {otherkey: event.to.ids.otherkey})
MATCH (from)-[r:MY_REL]->(to)
DELETE r
我们可以在关系创建/合并时创建不存在的节点,在"from"
和/或"to"
字段中放入"op": "merge"
。默认情况下,"op"为match
,因此如果节点不存在,则不会创建它。例如,我们可以编写
{
"op": "create",
"properties": {},
"rel_type": "MY_REL",
"from": {
"ids": {"key": 1},
"labels": ["Foo","Bar"],
"op": "merge"
},
"to": {
"ids": {"otherKey":1},
"labels": ["FooBar"],
"op": "merge"
},
"type":"relationship"
}
如何处理错误数据
Neo4j Streams插件提供了多种处理处理错误的方法。
它可以快速失败或记录不同详细程度的错误。另一种方法是将所有数据和错误重新路由到死信队列
,因为某些原因无法将其摄取。
它默认的行为类似于Kafka Connect,请参阅此博文 |
-
默认情况下快速失败(中止)
-
需要配置死信队列主题以启用
-
需要显式启用日志记录
-
必须显式启用报头和消息日志记录
配置选项
名称 | 值 | 注意 |
---|---|---|
|
|
快速失败(默认!)中止 |
|
|
all == 宽容,静默忽略错误消息 |
|
|
记录错误(默认:false) |
|
|
也记录错误消息(默认:false) |
|
|
死信队列主题名称,如果省略则没有DLQ,默认:未设置 |
|
|
使用元数据报头(如异常、时间戳、org.topic、org.part)丰富消息,默认:false |
|
|
报头条目的通用前缀,例如 |
|
|
复制因子,需要设置为1以获得单个分区,默认:3 |
对于Neo4j扩展,您在Neo4j配置中以streams.sink
作为前缀。
示例设置
errors.tolerance=none
errors.tolerance=all
errors.log.enable=true
errors.log.include.messages=true
errors.tolerance=all
errors.deadletterqueue.topic.name=my-dlq-topic
errors.deadletterqueue.context.headers.enable=true
streams.sink.errors.tolerance=all
streams.sink.errors.deadletterqueue.topic.name=my-dlq-topic
streams.sink.errors.deadletterqueue.context.headers.enable=true
死信队列
中发布的每个记录都包含原始记录的Key
和Value
对,以及可选的以下报头
报头键 | 描述 |
---|---|
|
发布数据所在的主题 |
|
发布数据所在的主题分区 |
|
数据在主题分区中的偏移量 |
|
生成错误的类 |
|
生成错误的异常 |
|
异常消息 |
|
异常堆栈跟踪 |
|
数据库名称 |
支持的Kafka反序列化器
Neo4j Streams插件支持2个反序列化器
-
org.apache.kafka.common.serialization.ByteArrayDeserializer
:如果您想管理JSON消息 -
io.confluent.kafka.serializers.KafkaAvroDeserializer
:如果您想管理AVRO消息
您可以根据配置段落中指定的Key
和Value
分别定义它们
配置摘要
您可以在neo4j.conf
中设置以下Kafka配置值,以下是默认值。
kafka.bootstrap.servers=localhost:9092
kafka.auto.offset.reset=earliest
kafka.group.id=neo4j
kafka.enable.auto.commit=true
kafka.key.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
kafka.value.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
streams.sink.topic.cypher.<TOPIC_NAME>=<CYPHER_QUERY>
streams.sink.topic.cdc.sourceId=<LIST_OF_TOPICS_SEPARATED_BY_SEMICOLON>
streams.sink.topic.cdc.schema=<LIST_OF_TOPICS_SEPARATED_BY_SEMICOLON>
streams.sink.topic.cud=<LIST_OF_TOPICS_SEPARATED_BY_SEMICOLON>
streams.sink.topic.pattern.node.<TOPIC_NAME>=<NODE_EXTRACTION_PATTERN>
streams.sink.topic.pattern.relationship.<TOPIC_NAME>=<RELATIONSHIP_EXTRACTION_PATTERN>
streams.sink.enabled=<true/false, default=false>
streams.check.apoc.timeout=<ms to await for APOC being loaded, default -1 skip the wait>
streams.check.apoc.interval=<ms interval awaiting for APOC being loaded, default 1000>
streams.sink.poll.interval=<The delay interval between poll cycles, default 0>
有关这些设置的详细信息,请参阅Apache Kafka文档。
如果streams.cluster.only 设置为true,则流将在单实例模式下或在备份操作的上下文中拒绝启动。这是确保在生产部署中意外情况下不发生操作的重要安全保护措施。 |
有关这些设置的详细信息,请参阅Apache Kafka文档。
自定义Kafka配置
在本节中,我们将描述特定Neo4j流Kafka配置的含义
kafka.streams.async.commit
如果kafka.enable.auto.commit=false
,此属性允许您管理如何将消息提交到主题。
可能的值
-
false
(默认)在幕后,我们使用Kafka Consumer的commitSync
方法 -
true
在幕后,我们使用Kafka Consumer的commitAsync
方法
commitSync
VS commitAsync
commitSync
是同步提交,将在提交成功或遇到不可恢复的错误(在这种情况下,它将抛给调用方)之前阻塞。
这意味着,commitSync
是一个具有内部重试机制的阻塞方法,这可能会影响摄取的性能,因为只有在提交结束时才会处理新一批消息。
另一方面,commitAsync
是一个异步调用(因此不会阻塞),并且不提供内部重试机制。
如果您需要确保数据一致性,请选择commitSync
,因为它会确保在执行任何进一步操作之前,您将知道偏移量提交是否成功或失败。但由于它是同步且阻塞的,您将花费更多时间等待提交完成,从而导致高延迟。如果您不介意一定程度的数据不一致并且想要低延迟,请选择commitAsync
,因为它不会等待完成。
多数据库支持
Neo4j 4.0 企业版具有多租户支持,为了支持此功能,您可以为每个数据库实例设置一个配置后缀,其模式为to.<DB_NAME>
,添加到neo4j.conf文件中的属性中。
以下是允许支持多租户的新属性列表
streams.sink.topic.cypher.<TOPIC_NAME>.to.<DB_NAME>=<CYPHER_QUERY>
streams.sink.topic.cdc.sourceId.to.<DB_NAME>=<LIST_OF_TOPICS_SEPARATE_BY_SEMICOLON>
streams.sink.topic.cdc.schema.to.<DB_NAME>=<LIST_OF_TOPICS_SEPARATE_BY_SEMICOLON>
streams.sink.topic.pattern.node.<TOPIC_NAME>.to.<DB_NAME>=<NODE_EXTRACTION_PATTERN>
streams.sink.topic.pattern.relationship.<TOPIC_NAME>.to.<DB_NAME>=<RELATIONSHIP_EXTRACTION_PATTERN>
streams.sink.enabled.to.<DB_NAME>=<true/false, default=false>
请注意 |
这意味着对于每个数据库实例,您可以指定是否
-
使用源连接器
-
路由模式
因此,如果您有一个名为foo
的实例,则可以这样指定配置
streams.sink.topic.cypher.<TOPIC_NAME>.to.foo=<CYPHER_QUERY>
streams.sink.topic.cdc.sourceId.to.foo=<LIST_OF_TOPICS_SEPARATE_BY_SEMICOLON>
streams.sink.topic.cdc.schema.to.foo=<LIST_OF_TOPICS_SEPARATE_BY_SEMICOLON>
streams.sink.topic.pattern.node.<TOPIC_NAME>.to.foo=<NODE_EXTRACTION_PATTERN>
streams.sink.topic.pattern.relationship.<TOPIC_NAME>.to.foo=<RELATIONSHIP_EXTRACTION_PATTERN>
streams.sink.enabled.to.foo=<true/false, default=false>
旧属性
streams.sink.topic.cypher.<TOPIC_NAME>=<CYPHER_QUERY>
streams.sink.topic.cdc.sourceId=<LIST_OF_TOPICS_SEPARATE_BY_SEMICOLON>
streams.sink.topic.cdc.schema=<LIST_OF_TOPICS_SEPARATE_BY_SEMICOLON>
streams.sink.topic.pattern.node.<TOPIC_NAME>=<NODE_EXTRACTION_PATTERN>
streams.sink.topic.pattern.relationship.<TOPIC_NAME>=<RELATIONSHIP_EXTRACTION_PATTERN>
streams.sink.enabled=<true/false, default=false>
仍然有效,并且它们引用Neo4j的默认数据库实例,该实例通常称为neo4j
,但可以通过单独的Neo4j系统配置进行控制。
默认数据库由Neo4j的dbms.default_database配置属性控制,因此我们明确了适用于此用户的默认数据库。数据库名称不区分大小写,并规范化为小写,并且必须遵循Neo4j数据库命名规则。(参考:https://neo4j.ac.cn/docs/operations-manual/current/manage-databases/configuration/#manage-databases-administration) |
特别是以下属性将用作非默认数据库实例的默认值,如果未提供特定配置参数。
streams.sink.enabled=<true/false, default=false>
这意味着如果您有3个数据库实例的Neo4j
-
neo4j(默认)
-
foo
-
bar
并且您希望在所有实例上启用Sink插件,您可以简单地省略任何关于启用它的配置,您只需要为每个实例提供路由配置。
streams.sink.topic.cypher.fooTopic.to.foo=MERGE (f:Foo{fooId: event.fooId}) SET c += event.properties
streams.sink.topic.cypher.barTopic.to.bar=MERGE (b:Bar{barId: event.barId}) SET c += event.properties
streams.sink.topic.cypher.barTopic.to.neo4j=MERGE (c:MyLabel{myId: event.myId}) SET c += event.properties
否则,如果您只想在customers
和products
实例上启用Sink插件,则可以按以下方式执行
streams.sink.enabled=false
streams.sink.enabled.to.foo=true
streams.sink.enabled.to.bar=true
streams.sink.topic.cypher.fooTopic.to.foo=MERGE (f:Foo{fooId: event.fooId}) SET c += event.properties
streams.sink.topic.cypher.barTopic.to.bar=MERGE (b:Bar{barId: event.barId}) SET c += event.properties
因此,一般来说,如果您有
streams.sink.enabled=true
streams.sink.enabled.to.foo=false
那么除了foo之外,所有数据库上都启用了sink(本地覆盖全局)