来源:Neo4j → Kafka

Kafka Connect Neo4j Connector 是将 Kafka 与 Neo4j 集成的推荐方法,因为 Neo4j Streams 已经不再积极开发,并且在 Neo4j 4.4 版本之后将不再提供支持。

最新的 Kafka Connect Neo4j Connector 版本可以在 此处 找到。

交易事件处理程序是将数据发送到 Kafka 主题的事件

配置

您可以在您的 neo4j.conf 中设置以下配置值,以下为默认值。

neo4j.conf(使用默认值)
kafka.bootstrap.servers=localhost:9092
kafka.acks=1
kafka.retries=2
kafka.batch.size=16384
kafka.buffer.memory=33554432
kafka.reindex.batch.size=1000
kafka.session.timeout.ms=15000
kafka.connection.timeout.ms=10000
kafka.replication=1
kafka.linger.ms=1
kafka.transactional.id=
kafka.topic.discovery.polling.interval=300000
kafka.streams.log.compaction.strategy=delete

streams.source.topic.nodes.<TOPIC_NAME>=<PATTERN>
streams.source.topic.relationships.<TOPIC_NAME>=<PATTERN>
streams.source.topic.relationships.<TOPIC_NAME>.key_strategy=<default/all>
streams.source.topic.nodes.<TOPIC_NAME>.from.<DB_NAME>=<PATTERN>
streams.source.topic.relationships.<TOPIC_NAME>.from.<DB_NAME>=<PATTERN>
streams.source.topic.relationships.<TOPIC_NAME>.from.<DB_NAME>.key_strategy=<default/all>
streams.source.enabled=<true/false, default=true>
streams.source.enabled.from.<DB_NAME>=<true/false, default=true>
streams.procedures.enabled.from.<DB_NAME>=<true/false, default=true>
streams.source.schema.polling.interval=<MILLIS, the polling interval for getting the schema information>

要使用 Kafka 事务,请正确设置 kafka.transactional.idkafka.acks。查看此 博客文章,以了解有关 Apache Kafka 中事务的更多详细信息

有关这些设置的详细信息,请参阅 Apache Kafka 文档

如果您 Kafka 代理已配置 auto.create.topics.enablefalse,则发送到不存在的主题的所有消息都将被丢弃;这是因为 KafkaProducer.send() 方法阻塞了执行,如 KAFKA-3539 中所述。您可以调整自定义属性 kafka.topic.discovery.polling.interval 以定期检查 Kafka 集群中的新主题,以便插件能够将事件发送到定义的主题。

使用 kafka.streams.log.compaction.strategy=delete 将使用 Neo4j Streams Source 生成一系列唯一密钥。而使用 kafka.streams.log.compaction.strategy=compact,密钥将被调整以在 Kafka 侧启用 日志压缩。请注意,删除策略不会实际删除记录,它具有此名称以匹配主题配置 cleanup.policy=delete/compact。也就是说,涉及相同节点或关系的操作将具有相同的密钥。

kafka.streams.log.compaction.strategy=compact 时,我们将利用内部 Kafka 机制进行分区。

请参阅 消息结构 部分,以查看密钥示例

多数据库支持

Neo4j 4.0 企业版具有 多租户支持,为了支持此功能,您可以为每个数据库实例设置一个配置后缀,该后缀遵循以下模式 from.<DB_NAME>,用于您 neo4j.conf 文件中的属性。

以下是允许支持多租户的新属性列表

streams.source.topic.nodes.<TOPIC_NAME>.from.<DB_NAME>=<PATTERN>
streams.source.topic.relationships.<TOPIC_NAME>.from.<DB_NAME>=<PATTERN>
streams.source.topic.relationships.<TOPIC_NAME>.from.<DB_NAME>.key_strategy=<PATTERN>
streams.source.enabled.from.<DB_NAME>=<true/false, default=true>

这意味着对于每个数据库实例,您可以指定是否

  • 使用源连接器

  • 路由模式

因此,如果您有一个实例名称 foo,则可以以这种方式指定配置

streams.source.topic.nodes.myTopic.from.foo=<PATTERN>
streams.source.topic.relationships.myTopic.from.foo=<PATTERN>
streams.source.enabled.from.foo=<true/false, default=true>

旧属性

streams.source.enabled=<true/false, default=true>
streams.source.topic.nodes.<TOPIC_NAME>=<PATTERN>
streams.source.topic.relationships.<TOPIC_NAME>=<PATTERN>
streams.procedures.enabled=<true/false, default=true>

仍然有效,它们指的是 Neo4j 的默认数据库实例。

默认数据库由 Neo4j 的 dbms.default_database 配置属性控制,因此我们明确了适用于此用户的默认数据库。数据库名称不区分大小写并规范化为小写,并且必须遵循 Neo4j 数据库命名规则。(参考:https://neo4j.ac.cn/docs/operations-manual/current/manage-databases/configuration/#manage-databases-administration)

特别是,以下属性将用作非默认数据库实例的默认值,如果未提供特定配置参数,则使用以下属性

streams.source.enabled=<true/false, default=true>

这意味着如果您拥有 3 个数据库实例的 Neo4j

  • neo4j(默认)

  • foo

  • bar

并且您希望在所有实例上启用源插件,则可以简单地省略有关启用它的任何配置,您只需要为每个实例提供路由配置即可

streams.source.topic.nodes.testTopic=Test{testId}
streams.source.topic.nodes.fooTopic.from.foo=Foo{fooId,fooName}
streams.source.topic.relationships.barTopic.from.bar=Bar{barId,barName}

否则,如果您只希望在 foobar 实例上启用源插件,则可以按以下方式执行

streams.source.enabled=false
streams.source.enabled.from.foo=true
streams.source.enabled.from.bar=true
streams.source.topic.nodes.testTopic=Test{testId}
streams.source.topic.nodes.fooTopic.from.foo=Foo{fooId,fooName}
streams.source.topic.relationships.barTopic.from.bar=Bar{barId,barName}

如您所见,如果您只希望在 一个或多个特定数据库实例上启用源插件,则必须先禁用源插件 (streams.source.enabled=false),然后仅在所需的实例上启用它(例如 streams.source.enabled.from.foo=true)。此外,请注意 streams.source.topic.nodes.testTopic=Test{testId} 不会被考虑,因为默认数据库实例 neo4j 上的源插件已被禁用。

因此,一般来说,如果您有

streams.source.enabled=true
streams.source.enabled.from.foo=false

那么源模块将在所有数据库上启用,除了 foo(本地覆盖全局)

仅出于示例目的,想象一下以下情况

您有一个 Neo4j 实例,没有安装 Neo4j Streams,其中创建并填充了数据库“testdb”。您决定安装 Neo4j Streams 插件,因为我们希望将图数据也放入 Kafka。因此,您添加以下配置

kafka.bootstrap.servers=localhost:9092
streams.source.enabled=true
streams.sink.enabled=false
streams.procedures.enabled=true
streams.source.topic.nodes.topicTest.from.testdb=Test{*}

这样做后,您希望将所有具有标签 Test 的已创建/更新的节点反映到主题 topicTest 中。实际发生的情况是

  • 在 Kafka 设置之前插入的所有节点和关系都会反映到一个名为“testdb”的主题中,该主题是使用数据库名称默认创建的。

  • 在 Kafka 设置之后,所有具有标签 Test 的已创建/更新的节点都会反映到已配置的主题 topicTest 中。

第二点之所以发生,是因为由于数据库“testdb”已经填充,通过启用源模块 (streams.source.enabled=true),插件将创建一个与数据库名称同名的默认主题,并将所有“testdb”数据反映到其中。

如果您希望关闭此默认行为,则必须禁用“通用”源模块,并仅为感兴趣的数据库启用它

kafka.bootstrap.servers=localhost:9092
streams.source.enabled=false
streams.sink.enabled=false
streams.procedures.enabled=true
streams.source.enabled.from.test1=true
streams.source.topic.nodes.topicTest.from.testdb=Test{*}

序列化器

为了允许以任何格式插入密钥(例如,通过 streams.publish 过程),key.serializer 使用 org.apache.kafka.common.serialization.ByteArraySerializer 设置,就像 value.serializer 一样

消息结构

消息密钥结构取决于 kafka.streams.log.compaction.strategy

使用删除时为字符串:“${meta.txId + meta.txEventId}-${meta.txEventId}”。

"[txId+txEventId] - txEventId "

其中

  • txId 标识影响实体的事务

  • txEventId 是一个计数器,标识 Neo4j 处理特定事件的内部顺序

  • [txId+txEventId] 是前面两个值的数字总和

 

而使用压缩时

对于没有约束标签的节点,密钥是节点 ID 的字符串值。

对于具有约束标签的节点,密钥是具有 {ids: mapOfConstaint , labels: listOfLabels} 的 JSON。

例如,使用以下配置

streams.source.topic.nodes.<TOPIC_NAME>=Person{*}
kafka.streams.log.compaction.strategy=compact

此约束

CREATE CONSTRAINT ON (p:Person) ASSERT p.name IS UNIQUE

以及此查询

CREATE (:Person {name:'Sherlock', surname: 'Holmes'})

我们将获得此密钥

{"ids": {"name": "Sherlock"}, "labels": ["Person"]}

 

否则,使用与上面相同的配置和查询,但使用以下约束

CREATE CONSTRAINT ON (p:Person) ASSERT (p.name, p.surname) IS NODE KEY

我们将获得此密钥

{"ids": {"surname": "Holmes", "name":  "Sherlock"}, "labels": ["Person"]}

 

对于关系,密钥是具有 {start: START_NODE , end: END_NODE, label: typeOfRelationship} 的 JSON。
START_NODE 和 END_NODE 节点遵循与上面相同的规则。

例如,使用以下配置

streams.source.topic.nodes.<TOPIC_NAME>=Person{*}
streams.source.topic.relationships.<TOPIC_NAME>=Person{*}
kafka.streams.log.compaction.strategy=compact

这些约束

CREATE CONSTRAINT ON (p:Person) ASSERT p.name IS UNIQUE;
CREATE CONSTRAINT ON (p:Product) ASSERT p.code IS UNIQUE;

以及这些查询

CREATE (:Person {name:'Pippo'});
CREATE (p:Product {code:'1367', name: 'Notebook'});
MATCH (pe:Person {name:'Pippo'}), (pr:Product {name:'Notebook'}) MERGE (pe)-[:BUYS]->(pr);

我们将获得此密钥

{"start": {"ids": {"name":  "Pippo"}, "labels": ["Person"]}, "end": {"ids": {"code":  "1367"}, "labels": ["Product"]},
 "label": "BUYS"}

 

否则,使用以下配置

streams.source.topic.nodes.<TOPIC_NAME>=Person{*}
streams.source.topic.relationships.<TOPIC_NAME>=Person{*}
kafka.streams.log.compaction.strategy=compact

没有约束,以及以下查询

CREATE (:Person {name:'Pippo'})

我们将获得此密钥

{"start": "0", "end": "1", "label": "BUYS"}

对于具有开始或结束节点上的多个约束的关系,ids 字段取决于 streams.source.topic.relationships.<TOPIC_NAME>.key_strategy 配置。请参阅“密钥策略”部分以了解更多详细信息

模式

节点

要控制哪些节点被发送到 Kafka,以及哪些节点的属性,您可以在配置中定义节点模式。

您可以选择标签和属性进行包含或排除,其中 * 表示 **所有**。

模式之间用分号 ; 分隔。

基本语法为

Label{*};Label1{prop1, prop2};Label3{-prop1,-prop2}
模式 表示

Label{*}

所有具有此标签的节点及其所有属性都将发送到相关主题

Label1:Label2

具有这两个标签的节点将发送到相关主题

Label{prop1,prop2}

具有此标签的所有节点的 prop1prop2 将发送到相关主题

Label{-prop1,-prop2}

在具有标签 Label 的节点中,属性 prop1prop2 将被排除

关系

要控制哪些关系被发送到 Kafka,以及哪些关系的属性,您可以在配置中定义关系模式。

您可以选择类型和属性进行包含或排除,其中 * 表示 **所有**。

模式之间用分号 ; 分隔。

基本语法为

KNOWS{*};MEET{prop1, prop2};ANSWER{-prop1,-prop2}
模式 表示

KNOWS{*}

所有与该标签相关的关系及其所有属性都将发送到相关主题

MEET{prop1,prop2}

所有与该类型相关的关系的prop1prop2属性都被发送到相关主题

ANSWER{-prop1,-prop2}

在类型为KNOWS的关系中,属性prop1prop2被排除在外

关系键策略

请参阅键策略部分。

事务事件处理程序

事务事件处理程序是流生产者的核心,允许流式传输数据库更改。

事件

生产者流式传输三种类型的事件

  • 创建:当节点/关系/属性被创建时

  • 更新:当节点/关系/属性被更新时

  • 删除:当节点/关系/属性被删除时

创建

以下是一个节点创建事件的示例

{
  "meta": {
    "timestamp": 1532597182604,
    "username": "neo4j",
    "tx_id": 3,
    "tx_event_id": 0,
    "tx_events_count": 2,
    "operation": "created",
    "source": {
      "hostname": "neo4j.mycompany.com"
    }
  },
  "payload": {
    "id": "1004",
    "type": "node",
    "after": {
      "labels": ["Person"],
      "properties": {
        "email": "annek@noanswer.org",
        "last_name": "Kretchmar",
        "first_name": "Anne Marie"
      }
    }
  },
  "schema": {
    "properties": {
      "last_name": "String",
      "email": "String",
      "first_name": "String"
    },
    "constraints": [{
      "label": "Person",
      "properties": ["first_name", "last_name"],
      "type": "UNIQUE"
    }]
  }
}
{
  "meta": {
    "timestamp": 1532597182604,
    "username": "neo4j",
    "tx_id": 3,
    "tx_event_id": 0,
    "tx_events_count": 2,
    "operation": "created",
    "source": {
      "hostname": "neo4j.mycompany.com"
    }
  },
  "payload": {
    "id": "123",
    "type": "relationship",
    "label": "KNOWS",
    "start": {
      "labels": ["Person"],
      "id": "123",
      "ids": {
        "last_name": "Andrea",
        "first_name": "Santurbano"
      }
    },
    "end": {
      "labels": ["Person"],
      "id": "456",
      "ids": {
        "last_name": "Michael",
        "first_name": "Hunger"
      }
    },
    "after": {
      "properties": {
        "since": "2018-04-05T12:34:00[Europe/Berlin]"
      }
    }
  },
  "schema": {
    "properties": {
      "since": "ZonedDateTime"
    },
    "constraints": [{
      "label": "KNOWS",
      "properties": ["since"],
      "type": "RELATIONSHIP_PROPERTY_EXISTS"
    }]
  }
}

更新

以下是一个节点更新事件的示例

{
  "meta": {
    "timestamp": 1532597182604,
    "username": "neo4j",
    "tx_id": 3,
    "tx_event_id": 0,
    "tx_events_count": 2,
    "operation": "updated",
    "source": {
      "hostname": "neo4j.mycompany.com"
    }
  },
  "payload": {
    "id": "1004",
    "type": "node",
    "before": {
      "labels": ["Person", "Tmp"],
      "properties": {
        "email": "annek@noanswer.org",
        "last_name": "Kretchmar",
        "first_name": "Anne"
      }
    },
    "after": {
      "labels": ["Person"],
      "properties": {
        "last_name": "Kretchmar",
        "email": "annek@noanswer.org",
        "first_name": "Anne Marie",
        "geo": { "crs": "wgs-84-3d", "latitude": 46.2222, "longitude": 32.11111, "height": 0.123 }
      }
    }
  },
  "schema": {
    "properties": {
      "last_name": "String",
      "email": "String",
      "first_name": "String",
      "geo": "point"
    },
    "constraints": [{
      "label": "Person",
      "properties": ["first_name", "last_name"],
      "type": "UNIQUE"
    }]
  }
}
{
  "meta": {
    "timestamp": 1532597182604,
    "username": "neo4j",
    "tx_id": 3,
    "tx_event_id": 0,
    "tx_events_count": 2,
    "operation": "updated",
    "source": {
      "hostname": "neo4j.mycompany.com"
    }
  },
  "payload": {
    "id": "123",
    "type": "relationship",
    "label": "KNOWS",
    "start": {
      "labels": ["Person"],
      "id": "123",
      "ids": {
        "last_name": "Andrea",
        "first_name": "Santurbano"
      }
    },
    "end": {
      "labels": ["Person"],
      "id": "456",
      "ids": {
        "last_name": "Michael",
        "first_name": "Hunger"
      }
    },
    "before": {
      "properties": {
        "since": "2018-04-05T12:34:00[Europe/Berlin]"
      }
    },
    "after": {
      "properties": {
        "since": "2018-04-05T12:34:00[Europe/Berlin]",
        "to": "2019-04-05T23:00:00[Europe/Berlin]"
      }
    }
  },
  "schema": {
    "properties": {
      "since": "ZonedDateTime",
      "to": "ZonedDateTime"
    },
    "constraints": [{
      "label": "KNOWS",
      "properties": ["since"],
      "type": "RELATIONSHIP_PROPERTY_EXISTS"
    }]
  }
}

删除

以下是一个节点创建事件的示例

{
  "meta": {
    "timestamp": 1532597182604,
    "username": "neo4j",
    "tx_id": 3,
    "tx_event_id": 0,
    "tx_events_count": 2,
    "operation": "deleted",
    "source": {
      "hostname": "neo4j.mycompany.com"
    }
  },
  "payload": {
    "id": "1004",
    "type": "node",
    "before": {
      "labels": ["Person"],
      "properties": {
        "last_name": "Kretchmar",
        "email": "annek@noanswer.org",
        "first_name": "Anne Marie",
        "geo": { "crs": "wgs-84-3d", "latitude": 46.2222, "longitude": 32.11111, "height": 0.123 }
      }
    }
  },
  "schema": {
    "properties": {
      "last_name": "String",
      "email": "String",
      "first_name": "String",
      "geo": "point"
    },
    "constraints": [{
      "label": "Person",
      "properties": ["first_name", "last_name"],
      "type": "UNIQUE"
    }]
  }
}
{
  "meta": {
    "timestamp": 1532597182604,
    "username": "neo4j",
    "tx_id": 3,
    "tx_event_id": 0,
    "tx_events_count": 2,
    "operation": "deleted",
    "source": {
      "hostname": "neo4j.mycompany.com"
    }
  },
  "payload": {
    "id": "123",
    "type": "relationship",
    "label": "KNOWS",
    "start": {
      "labels": ["Person"],
      "id": "123",
      "ids": {
        "last_name": "Andrea",
        "first_name": "Santurbano"
      }
    },
    "end": {
      "labels": ["Person"],
      "id": "456",
      "ids": {
        "last_name": "Michael",
        "first_name": "Hunger"
      }
    },
    "before": {
      "properties": {
        "since": "2018-04-05T12:34:00[Europe/Berlin]",
        "to": "2019-04-05T23:00:00[Europe/Berlin]"
      }
    }
  },
  "schema": {
    "properties": {
      "since": "ZonedDateTime",
      "to": "ZonedDateTime"
    },
    "constraints": [{
      "label": "KNOWS",
      "properties": ["since"],
      "type": "RELATIONSHIP_PROPERTY_EXISTS"
    }]
  }
}

元数据

元数据字段包含与事务事件相关的元数据

字段 类型 描述

时间戳

数字

与事务事件相关的时间戳

用户名

字符串

生成事务的用户名

tx_id

数字

Neo4j 事务管理器提供的交易 ID

tx_event_count

数字

包含在事务中的事件数量(例如,节点上的 2 次更新,1 次关系创建)

tx_event_id

数字

事务中事件的 ID

操作

枚举["created", "updated", "deleted"]

操作类型

来源

对象

包含有关来源的信息

来源

字段 类型 描述

主机名

字符串

有关来源的信息

有效载荷

有效载荷字段包含有关与事件相关的数据的信息

字段 类型 描述

ID

数字

图实体的 ID

类型

枚举["node", "relationship"]

图实体的类型

之前

对象

事务事件之前的數據

之后

对象

事务事件之后的數據

有效载荷:之前和之后

我们必须区分两种情况

节点

字段 类型 描述

标签

字符串[]

附加到节点的标签列表

属性

Map<K, V>

附加到节点的属性列表,K是属性名称

关系

字段 类型 描述

标签

字符串

关系类型

属性

Map<K,V>

附加到关系的属性列表,K是属性名称

开始

对象

关系的起始节点

结束

对象

关系的结束节点

关系:起始节点和结束节点

字段 类型 描述

ID

数字

节点的 ID

标签

字符串[]

附加到节点的标签列表

ID

Map<K,V>

与节点的定义约束(UNIQUENESS和/或NODE_KEY)相关的 ID。K是属性名称,V是相关值

模式

字段 类型 描述

约束

Object[]

附加到实体的约束列表

属性

Map<K, V>

附加到实体的属性列表,其中K是属性名称,V是类类型

约束

节点和关系可以有一系列附加到它们的约束

表 1. 约束
字段 类型 描述

标签

字符串

附加到约束的标签

类型

枚举["UNIQUE", "NODE_PROPERTY_EXISTS", "RELATIONSHIP_PROPERTY_EXISTS"]

约束类型

属性

字符串[]

参与约束的属性列表