来源:Neo4j → Kafka
Kafka Connect Neo4j Connector 是将 Kafka 与 Neo4j 集成的推荐方法,因为 Neo4j Streams 已经不再积极开发,并且在 Neo4j 4.4 版本之后将不再提供支持。 最新的 Kafka Connect Neo4j Connector 版本可以在 此处 找到。 |
交易事件处理程序是将数据发送到 Kafka 主题的事件
配置
您可以在您的 neo4j.conf
中设置以下配置值,以下为默认值。
kafka.bootstrap.servers=localhost:9092
kafka.acks=1
kafka.retries=2
kafka.batch.size=16384
kafka.buffer.memory=33554432
kafka.reindex.batch.size=1000
kafka.session.timeout.ms=15000
kafka.connection.timeout.ms=10000
kafka.replication=1
kafka.linger.ms=1
kafka.transactional.id=
kafka.topic.discovery.polling.interval=300000
kafka.streams.log.compaction.strategy=delete
streams.source.topic.nodes.<TOPIC_NAME>=<PATTERN>
streams.source.topic.relationships.<TOPIC_NAME>=<PATTERN>
streams.source.topic.relationships.<TOPIC_NAME>.key_strategy=<default/all>
streams.source.topic.nodes.<TOPIC_NAME>.from.<DB_NAME>=<PATTERN>
streams.source.topic.relationships.<TOPIC_NAME>.from.<DB_NAME>=<PATTERN>
streams.source.topic.relationships.<TOPIC_NAME>.from.<DB_NAME>.key_strategy=<default/all>
streams.source.enabled=<true/false, default=true>
streams.source.enabled.from.<DB_NAME>=<true/false, default=true>
streams.procedures.enabled.from.<DB_NAME>=<true/false, default=true>
streams.source.schema.polling.interval=<MILLIS, the polling interval for getting the schema information>
要使用 Kafka 事务,请正确设置 |
有关这些设置的详细信息,请参阅 Apache Kafka 文档。
如果您 Kafka 代理已配置 auto.create.topics.enable
为 false
,则发送到不存在的主题的所有消息都将被丢弃;这是因为 KafkaProducer.send()
方法阻塞了执行,如 KAFKA-3539 中所述。您可以调整自定义属性 kafka.topic.discovery.polling.interval
以定期检查 Kafka 集群中的新主题,以便插件能够将事件发送到定义的主题。
使用 kafka.streams.log.compaction.strategy=delete
将使用 Neo4j Streams Source 生成一系列唯一密钥。而使用 kafka.streams.log.compaction.strategy=compact
,密钥将被调整以在 Kafka 侧启用 日志压缩。请注意,删除策略不会实际删除记录,它具有此名称以匹配主题配置 cleanup.policy=delete/compact
。也就是说,涉及相同节点或关系的操作将具有相同的密钥。
当 kafka.streams.log.compaction.strategy=compact
时,我们将利用内部 Kafka 机制进行分区。
请参阅 消息结构 部分,以查看密钥示例
多数据库支持
Neo4j 4.0 企业版具有 多租户支持,为了支持此功能,您可以为每个数据库实例设置一个配置后缀,该后缀遵循以下模式 from.<DB_NAME>
,用于您 neo4j.conf 文件中的属性。
以下是允许支持多租户的新属性列表
streams.source.topic.nodes.<TOPIC_NAME>.from.<DB_NAME>=<PATTERN>
streams.source.topic.relationships.<TOPIC_NAME>.from.<DB_NAME>=<PATTERN>
streams.source.topic.relationships.<TOPIC_NAME>.from.<DB_NAME>.key_strategy=<PATTERN>
streams.source.enabled.from.<DB_NAME>=<true/false, default=true>
这意味着对于每个数据库实例,您可以指定是否
-
使用源连接器
-
路由模式
因此,如果您有一个实例名称 foo
,则可以以这种方式指定配置
streams.source.topic.nodes.myTopic.from.foo=<PATTERN>
streams.source.topic.relationships.myTopic.from.foo=<PATTERN>
streams.source.enabled.from.foo=<true/false, default=true>
旧属性
streams.source.enabled=<true/false, default=true>
streams.source.topic.nodes.<TOPIC_NAME>=<PATTERN>
streams.source.topic.relationships.<TOPIC_NAME>=<PATTERN>
streams.procedures.enabled=<true/false, default=true>
仍然有效,它们指的是 Neo4j 的默认数据库实例。
默认数据库由 Neo4j 的 dbms.default_database 配置属性控制,因此我们明确了适用于此用户的默认数据库。数据库名称不区分大小写并规范化为小写,并且必须遵循 Neo4j 数据库命名规则。(参考:https://neo4j.ac.cn/docs/operations-manual/current/manage-databases/configuration/#manage-databases-administration) |
特别是,以下属性将用作非默认数据库实例的默认值,如果未提供特定配置参数,则使用以下属性
streams.source.enabled=<true/false, default=true>
这意味着如果您拥有 3 个数据库实例的 Neo4j
-
neo4j(默认)
-
foo
-
bar
并且您希望在所有实例上启用源插件,则可以简单地省略有关启用它的任何配置,您只需要为每个实例提供路由配置即可
streams.source.topic.nodes.testTopic=Test{testId}
streams.source.topic.nodes.fooTopic.from.foo=Foo{fooId,fooName}
streams.source.topic.relationships.barTopic.from.bar=Bar{barId,barName}
否则,如果您只希望在 foo
和 bar
实例上启用源插件,则可以按以下方式执行
streams.source.enabled=false
streams.source.enabled.from.foo=true
streams.source.enabled.from.bar=true
streams.source.topic.nodes.testTopic=Test{testId}
streams.source.topic.nodes.fooTopic.from.foo=Foo{fooId,fooName}
streams.source.topic.relationships.barTopic.from.bar=Bar{barId,barName}
如您所见,如果您只希望在 一个或多个特定数据库实例上启用源插件,则必须先禁用源插件 ( |
因此,一般来说,如果您有
streams.source.enabled=true
streams.source.enabled.from.foo=false
那么源模块将在所有数据库上启用,除了 foo
(本地覆盖全局)
仅出于示例目的,想象一下以下情况 您有一个 Neo4j 实例,没有安装 Neo4j Streams,其中创建并填充了数据库“testdb”。您决定安装 Neo4j Streams 插件,因为我们希望将图数据也放入 Kafka。因此,您添加以下配置
这样做后,您希望将所有具有标签
第二点之所以发生,是因为由于数据库“testdb”已经填充,通过启用源模块 ( 如果您希望关闭此默认行为,则必须禁用“通用”源模块,并仅为感兴趣的数据库启用它
|
序列化器
为了允许以任何格式插入密钥(例如,通过 streams.publish
过程),key.serializer
使用 org.apache.kafka.common.serialization.ByteArraySerializer
设置,就像 value.serializer
一样
消息结构
消息密钥结构取决于 kafka.streams.log.compaction.strategy
。
使用删除时为字符串:“${meta.txId + meta.txEventId}-${meta.txEventId}”。
"[txId+txEventId] - txEventId "
其中
-
txId
标识影响实体的事务 -
txEventId
是一个计数器,标识 Neo4j 处理特定事件的内部顺序 -
[txId+txEventId] 是前面两个值的数字总和
而使用压缩时
对于没有约束标签的节点,密钥是节点 ID 的字符串值。
对于具有约束标签的节点,密钥是具有 {ids: mapOfConstaint , labels: listOfLabels}
的 JSON。
例如,使用以下配置
streams.source.topic.nodes.<TOPIC_NAME>=Person{*}
kafka.streams.log.compaction.strategy=compact
此约束
CREATE CONSTRAINT ON (p:Person) ASSERT p.name IS UNIQUE
以及此查询
CREATE (:Person {name:'Sherlock', surname: 'Holmes'})
我们将获得此密钥
{"ids": {"name": "Sherlock"}, "labels": ["Person"]}
否则,使用与上面相同的配置和查询,但使用以下约束
CREATE CONSTRAINT ON (p:Person) ASSERT (p.name, p.surname) IS NODE KEY
我们将获得此密钥
{"ids": {"surname": "Holmes", "name": "Sherlock"}, "labels": ["Person"]}
对于关系,密钥是具有 {start: START_NODE , end: END_NODE, label: typeOfRelationship}
的 JSON。
START_NODE 和 END_NODE 节点遵循与上面相同的规则。
例如,使用以下配置
streams.source.topic.nodes.<TOPIC_NAME>=Person{*}
streams.source.topic.relationships.<TOPIC_NAME>=Person{*}
kafka.streams.log.compaction.strategy=compact
这些约束
CREATE CONSTRAINT ON (p:Person) ASSERT p.name IS UNIQUE;
CREATE CONSTRAINT ON (p:Product) ASSERT p.code IS UNIQUE;
以及这些查询
CREATE (:Person {name:'Pippo'});
CREATE (p:Product {code:'1367', name: 'Notebook'});
MATCH (pe:Person {name:'Pippo'}), (pr:Product {name:'Notebook'}) MERGE (pe)-[:BUYS]->(pr);
我们将获得此密钥
{"start": {"ids": {"name": "Pippo"}, "labels": ["Person"]}, "end": {"ids": {"code": "1367"}, "labels": ["Product"]},
"label": "BUYS"}
否则,使用以下配置
streams.source.topic.nodes.<TOPIC_NAME>=Person{*}
streams.source.topic.relationships.<TOPIC_NAME>=Person{*}
kafka.streams.log.compaction.strategy=compact
没有约束,以及以下查询
CREATE (:Person {name:'Pippo'})
我们将获得此密钥
{"start": "0", "end": "1", "label": "BUYS"}
对于具有开始或结束节点上的多个约束的关系, |
模式
节点
要控制哪些节点被发送到 Kafka,以及哪些节点的属性,您可以在配置中定义节点模式。
您可以选择标签和属性进行包含或排除,其中 *
表示 **所有**。
模式之间用分号 ;
分隔。
基本语法为
Label{*};Label1{prop1, prop2};Label3{-prop1,-prop2}
模式 | 表示 |
---|---|
|
所有具有此标签的节点及其所有属性都将发送到相关主题 |
|
具有这两个标签的节点将发送到相关主题 |
|
具有此标签的所有节点的 |
|
在具有标签 |
关系
要控制哪些关系被发送到 Kafka,以及哪些关系的属性,您可以在配置中定义关系模式。
您可以选择类型和属性进行包含或排除,其中 *
表示 **所有**。
模式之间用分号 ;
分隔。
基本语法为
KNOWS{*};MEET{prop1, prop2};ANSWER{-prop1,-prop2}
模式 | 表示 |
---|---|
|
所有与该标签相关的关系及其所有属性都将发送到相关主题 |
|
所有与该类型相关的关系的 |
|
在类型为KNOWS的关系中,属性 |
关系键策略
请参阅键策略部分。
事务事件处理程序
事务事件处理程序是流生产者的核心,允许流式传输数据库更改。
事件
生产者流式传输三种类型的事件
-
创建:当节点/关系/属性被创建时
-
更新:当节点/关系/属性被更新时
-
删除:当节点/关系/属性被删除时
创建
以下是一个节点创建事件的示例
{
"meta": {
"timestamp": 1532597182604,
"username": "neo4j",
"tx_id": 3,
"tx_event_id": 0,
"tx_events_count": 2,
"operation": "created",
"source": {
"hostname": "neo4j.mycompany.com"
}
},
"payload": {
"id": "1004",
"type": "node",
"after": {
"labels": ["Person"],
"properties": {
"email": "annek@noanswer.org",
"last_name": "Kretchmar",
"first_name": "Anne Marie"
}
}
},
"schema": {
"properties": {
"last_name": "String",
"email": "String",
"first_name": "String"
},
"constraints": [{
"label": "Person",
"properties": ["first_name", "last_name"],
"type": "UNIQUE"
}]
}
}
{
"meta": {
"timestamp": 1532597182604,
"username": "neo4j",
"tx_id": 3,
"tx_event_id": 0,
"tx_events_count": 2,
"operation": "created",
"source": {
"hostname": "neo4j.mycompany.com"
}
},
"payload": {
"id": "123",
"type": "relationship",
"label": "KNOWS",
"start": {
"labels": ["Person"],
"id": "123",
"ids": {
"last_name": "Andrea",
"first_name": "Santurbano"
}
},
"end": {
"labels": ["Person"],
"id": "456",
"ids": {
"last_name": "Michael",
"first_name": "Hunger"
}
},
"after": {
"properties": {
"since": "2018-04-05T12:34:00[Europe/Berlin]"
}
}
},
"schema": {
"properties": {
"since": "ZonedDateTime"
},
"constraints": [{
"label": "KNOWS",
"properties": ["since"],
"type": "RELATIONSHIP_PROPERTY_EXISTS"
}]
}
}
更新
以下是一个节点更新事件的示例
{
"meta": {
"timestamp": 1532597182604,
"username": "neo4j",
"tx_id": 3,
"tx_event_id": 0,
"tx_events_count": 2,
"operation": "updated",
"source": {
"hostname": "neo4j.mycompany.com"
}
},
"payload": {
"id": "1004",
"type": "node",
"before": {
"labels": ["Person", "Tmp"],
"properties": {
"email": "annek@noanswer.org",
"last_name": "Kretchmar",
"first_name": "Anne"
}
},
"after": {
"labels": ["Person"],
"properties": {
"last_name": "Kretchmar",
"email": "annek@noanswer.org",
"first_name": "Anne Marie",
"geo": { "crs": "wgs-84-3d", "latitude": 46.2222, "longitude": 32.11111, "height": 0.123 }
}
}
},
"schema": {
"properties": {
"last_name": "String",
"email": "String",
"first_name": "String",
"geo": "point"
},
"constraints": [{
"label": "Person",
"properties": ["first_name", "last_name"],
"type": "UNIQUE"
}]
}
}
{
"meta": {
"timestamp": 1532597182604,
"username": "neo4j",
"tx_id": 3,
"tx_event_id": 0,
"tx_events_count": 2,
"operation": "updated",
"source": {
"hostname": "neo4j.mycompany.com"
}
},
"payload": {
"id": "123",
"type": "relationship",
"label": "KNOWS",
"start": {
"labels": ["Person"],
"id": "123",
"ids": {
"last_name": "Andrea",
"first_name": "Santurbano"
}
},
"end": {
"labels": ["Person"],
"id": "456",
"ids": {
"last_name": "Michael",
"first_name": "Hunger"
}
},
"before": {
"properties": {
"since": "2018-04-05T12:34:00[Europe/Berlin]"
}
},
"after": {
"properties": {
"since": "2018-04-05T12:34:00[Europe/Berlin]",
"to": "2019-04-05T23:00:00[Europe/Berlin]"
}
}
},
"schema": {
"properties": {
"since": "ZonedDateTime",
"to": "ZonedDateTime"
},
"constraints": [{
"label": "KNOWS",
"properties": ["since"],
"type": "RELATIONSHIP_PROPERTY_EXISTS"
}]
}
}
删除
以下是一个节点创建事件的示例
{
"meta": {
"timestamp": 1532597182604,
"username": "neo4j",
"tx_id": 3,
"tx_event_id": 0,
"tx_events_count": 2,
"operation": "deleted",
"source": {
"hostname": "neo4j.mycompany.com"
}
},
"payload": {
"id": "1004",
"type": "node",
"before": {
"labels": ["Person"],
"properties": {
"last_name": "Kretchmar",
"email": "annek@noanswer.org",
"first_name": "Anne Marie",
"geo": { "crs": "wgs-84-3d", "latitude": 46.2222, "longitude": 32.11111, "height": 0.123 }
}
}
},
"schema": {
"properties": {
"last_name": "String",
"email": "String",
"first_name": "String",
"geo": "point"
},
"constraints": [{
"label": "Person",
"properties": ["first_name", "last_name"],
"type": "UNIQUE"
}]
}
}
{
"meta": {
"timestamp": 1532597182604,
"username": "neo4j",
"tx_id": 3,
"tx_event_id": 0,
"tx_events_count": 2,
"operation": "deleted",
"source": {
"hostname": "neo4j.mycompany.com"
}
},
"payload": {
"id": "123",
"type": "relationship",
"label": "KNOWS",
"start": {
"labels": ["Person"],
"id": "123",
"ids": {
"last_name": "Andrea",
"first_name": "Santurbano"
}
},
"end": {
"labels": ["Person"],
"id": "456",
"ids": {
"last_name": "Michael",
"first_name": "Hunger"
}
},
"before": {
"properties": {
"since": "2018-04-05T12:34:00[Europe/Berlin]",
"to": "2019-04-05T23:00:00[Europe/Berlin]"
}
}
},
"schema": {
"properties": {
"since": "ZonedDateTime",
"to": "ZonedDateTime"
},
"constraints": [{
"label": "KNOWS",
"properties": ["since"],
"type": "RELATIONSHIP_PROPERTY_EXISTS"
}]
}
}
元数据
元数据字段包含与事务事件相关的元数据
字段 | 类型 | 描述 |
---|---|---|
时间戳 |
数字 |
与事务事件相关的时间戳 |
用户名 |
字符串 |
生成事务的用户名 |
tx_id |
数字 |
Neo4j 事务管理器提供的交易 ID |
tx_event_count |
数字 |
包含在事务中的事件数量(例如,节点上的 2 次更新,1 次关系创建) |
tx_event_id |
数字 |
事务中事件的 ID |
操作 |
枚举["created", "updated", "deleted"] |
操作类型 |
来源 |
对象 |
包含有关来源的信息 |
有效载荷
有效载荷字段包含有关与事件相关的数据的信息
字段 | 类型 | 描述 |
---|---|---|
ID |
数字 |
图实体的 ID |
类型 |
枚举["node", "relationship"] |
图实体的类型 |
之前 |
对象 |
事务事件之前的數據 |
之后 |
对象 |
事务事件之后的數據 |