Neo4j Streams - Sink:Kafka → Neo4j

Kafka Connect Neo4j 连接器是将 Kafka 与 Neo4j 集成的推荐方法,因为 Neo4j Streams 不再处于积极开发状态,并且在 Neo4j 4.4 版之后将不再受支持。

Kafka Connect Neo4j 连接器的最新版本可以在这里找到 这里

是 Kafka Sink,它将数据直接导入 Neo4j

工作原理

它通过多种方式工作

  • 通过提供 Cypher 模板

  • 通过通过更改数据捕获模块获取来自另一个 Neo4j 实例发出的事件

  • 通过向 JSON 或 AVRO 文件提供模式提取

  • 通过管理 CUD 文件格式

Cypher 模板策略是唯一保证消息按到达主题的顺序处理的 Sink 策略。

其他 Sink 策略按操作类型对消息进行分组,这也可以优化为批处理。在这种情况下,执行顺序如下

  1. 所有节点上的 MERGE 操作

  2. 所有节点上的 DELETE 操作

  3. 所有关系上的 MERGE 操作

  4. 所有关系上的 DELETE 操作

Cypher 模板

它使用存储在以下格式的属性中的模板 Cypher 查询

neo4j.conf
streams.sink.topic.cypher.<TOPIC_NAME>=<CYPHER_QUERY>

每个 Cypher 模板都必须引用一个将由 Sink 注入的 事件 对象

以下是一个示例

对于此事件

{
 "id": 42,
 "properties": {
   "title": "Answer to anyting",
   "description": "It depends."
 }
}
neo4j.conf
streams.sink.topic.cypher.my-topic=MERGE (n:Label {id: event.id}) \
ON CREATE SET n += event.properties

在后台,Sink 以这种方式将事件对象作为参数注入

UNWIND {events} AS event
MERGE (n:Label {id: event.id})
    ON CREATE SET n += event.properties

其中 {events} 是一个 json 列表,因此继续以上面的示例,可能的完整表示可能如下

:params events => [{id:"alice@example.com",properties:{name:"Alice",age:32}},
    {id:"bob@example.com",properties:{name:"Bob",age:42}}]

UNWIND {events} AS event
MERGE (n:Label {id: event.id})
    ON CREATE SET n += event.properties

当您决定使用 Cypher 模板作为 Sink 策略将数据从 Kafka 导入 Neo4j 时,您必须确保查询的正确性。如果查询未优化,这也会导致可能的性能问题或插件似乎卡住的情况,例如,如果查询将大量节点和关系加载到内存中。我们建议以下操作

  • 执行 EXPLAIN 查询以更好地分析查询并避免此类情况

  • 如果 Neo4j 似乎卡住了,则从 Neo4j 浏览器执行 CALL dbms.listQueries() 以查看当前在实例中执行的所有查询,并确保没有锁定的查询

更改数据捕获事件

此方法允许获取来自另一个 Neo4j 实例的 CDC 事件。您可以使用两种策略

  • SourceId 策略,它通过 CDC 事件 id 字段(与 Neo4j 物理 ID 相关)合并节点/关系

  • Schema 策略,它通过图模型中定义的约束(唯一性、节点键)合并节点/关系

SourceId 策略

您可以按以下方式配置主题

streams.sink.topic.cdc.sourceId=<list of topics separated by semicolon>
streams.sink.topic.cdc.sourceId.labelName=<the label attached to the node, default=SourceEvent>
streams.sink.topic.cdc.sourceId.idName=<the id name given to the CDC id field, default=sourceId>
streams.sink.topic.cdc.sourceId=my-topic;my-other.topic

每个 streams 事件都将投影到相关图实体中,例如以下事件

{
  "meta": {
    "timestamp": 1532597182604,
    "username": "neo4j",
    "tx_id": 3,
    "tx_event_id": 0,
    "tx_events_count": 2,
    "operation": "created",
    "source": {
      "hostname": "neo4j.mycompany.com"
    }
  },
  "payload": {
    "id": "1004",
    "type": "node",
    "after": {
      "labels": ["Person"],
      "properties": {
        "email": "annek@noanswer.org",
        "last_name": "Kretchmar",
        "first_name": "Anne Marie"
      }
    }
  },
  "schema": {
    "properties": {
      "last_name": "String",
      "email": "String",
      "first_name": "String"
    },
    "constraints": [{
      "label": "Person",
      "properties": ["first_name", "last_name"],
      "type": "UNIQUE"
    }]
  }
}

将持久化为以下节点

Person:SourceEvent{first_name: "Anne Marie", last_name: "Kretchmar", email: "annek@noanswer.org", sourceId: "1004"}

正如您所注意到的,摄取的事件已使用两种特性进行了投影

  • id 字段已转换为 sourceId

  • 节点具有附加标签 SourceEvent

这两个字段将用于匹配节点/关系以进行将来的更新/删除

Schema 策略

您可以按以下方式配置主题

streams.sink.topic.cdc.schema=<LIST_OF_TOPICS_SEPARATED_BY_SEMICOLON>
streams.sink.topic.cdc.schema=my-topic;my-other.topic

每个 streams 事件都将投影到相关图实体中,例如以下事件

{
  "meta": {
    "timestamp": 1532597182604,
    "username": "neo4j",
    "tx_id": 3,
    "tx_event_id": 0,
    "tx_events_count": 2,
    "operation": "created",
    "source": {
      "hostname": "neo4j.mycompany.com"
    }
  },
  "payload": {
    "id": "1004",
    "type": "node",
    "after": {
      "labels": ["Person"],
      "properties": {
        "email": "annek@noanswer.org",
        "last_name": "Kretchmar",
        "first_name": "Anne Marie"
      }
    }
  },
  "schema": {
    "properties": {
      "last_name": "String",
      "email": "String",
      "first_name": "String"
    },
    "constraints": [{
      "label": "Person",
      "properties": ["first_name", "last_name"],
      "type": "UNIQUE"
    }]
  }
}

将持久化为以下节点

Person{first_name: "Anne Marie", last_name: "Kretchmar", email: "annek@noanswer.org"}

Schema 策略利用 schema 字段插入/更新节点,因此不会创建额外的字段。

在关系的情况下

{
  "meta": {
    "timestamp": 1532597182604,
    "username": "neo4j",
    "tx_id": 3,
    "tx_event_id": 0,
    "tx_events_count": 2,
    "operation": "created",
    "source": {
      "hostname": "neo4j.mycompany.com"
    }
  },
  "payload": {
    "id": "123",
    "type": "relationship",
    "label": "KNOWS",
    "start": {
      "labels": ["Person"],
      "id": "123",
      "ids": {
        "last_name": "Andrea",
        "first_name": "Santurbano"
      }
    },
    "end": {
      "labels": ["Person"],
      "id": "456",
      "ids": {
        "last_name": "Michael",
        "first_name": "Hunger"
      }
    },
    "after": {
      "properties": {
        "since": "2018-04-05T12:34:00[Europe/Berlin]"
      }
    }
  },
  "schema": {
    "properties": {
      "since": "ZonedDateTime"
    },
    "constraints": [{
      "label": "KNOWS",
      "properties": ["since"],
      "type": "RELATIONSHIP_PROPERTY_EXISTS"
    }]
  }
}

Schema 策略利用 ids 字段插入/更新关系,因此不会创建额外的字段。

Pattern 策略

Pattern 策略允许您通过提供提取模式从 json 中提取节点和关系

每个属性都可以以以下方式为前缀

  • !:标识 id(可以有多个属性),这是 必需的

  • -:从提取中排除属性如果没有指定前缀,则表示将包含该属性

您不能混合包含和排除,因此您的模式必须包含所有排除或包含属性

标签可以通过 : 连接

墓碑记录管理

模式策略支持 墓碑记录,为了利用它,您的事件应包含您要删除的记录作为键,以及 null 作为值。

目前,您无法连接多个模式(例如,如果您只使用一个主题并生成多个节点/关系类型)。因此,您必须为每种类型的节点/关系使用不同的主题,并为每个主题定义一个模式

节点模式 配置

您可以按以下方式配置节点模式提取

streams.sink.topic.pattern.node.<TOPIC_NAME>=<NODE_EXTRACTION_PATTERN>

例如,给定通过 user 主题发布的以下 json

{"userId": 1, "name": "Andrea", "surname": "Santurbano", "address": {"city": "Venice", "cap": "30100"}}

您可以通过提供以下配置将其转换为节点

通过指定一个更简单的模式

streams.sink.topic.pattern.node.user=User{!userId}

或者通过指定一个类似 Cypher 的节点模式

streams.sink.topic.pattern.node.user=(:User{!userId})

与 CDC 模式类似,您可以提供

模式 含义

User:Actor{!userId} 或 User:Actor{!userId,*}

userId 将用作 ID 字段,并且 json 的所有属性都将附加到具有提供的标签(UserActor)的节点,因此持久化的节点将是:(User:Actor{userId: 1, name: 'Andrea', surname: 'Santurbano', address.city: 'Venice', address.cap: 30100})

User{!userId, surname}

userId 将用作 ID 字段,并且 json 的 surname 属性将附加到具有提供的标签(User)的节点,因此持久化的节点将是:(User{userId: 1, surname: 'Santurbano'})

User{!userId, surname, address.city}

userId 将用作 ID 字段,并且 json 的 surname 和 address.city 属性将附加到具有提供的标签(User)的节点,因此持久化的节点将是:(User{userId: 1, surname: 'Santurbano', address.city: 'Venice'})

User{!userId,-address}

userId 将用作 ID 字段,并且 address 属性将被排除,因此持久化的节点将是:(User{userId: 1, name: 'Andrea', surname: 'Santurbano'})

关系模式 配置

您可以按以下方式配置关系模式提取

streams.sink.topic.pattern.relationship.<TOPIC_NAME>=<RELATIONSHIP_EXTRACTION_PATTERN>

例如,给定通过 user 主题发布的以下 json

{"userId": 1, "productId": 100, "price": 10, "currency": "€", "shippingAddress": {"city": "Venice", cap: "30100"}}

您可以通过提供以下配置将其转换为路径,例如 (n)-[r]→(m)

通过指定一个更简单的模式

streams.sink.topic.pattern.relationship.user=User{!userId} BOUGHT{price, currency} Product{!productId}

或者通过指定一个类似 Cypher 的节点模式

streams.sink.topic.pattern.relationship.user=(:User{!userId})-[:BOUGHT{price, currency}]->(:Product{!productId})

在最后一种情况下,我们假设 User 是源节点,Product 是目标节点

与 CDC 模式类似,您可以提供

模式 含义

(User{!userId})-[:BOUGHT]→(Product{!productId}) 或 (User{!userId})-[:BOUGHT{price, currency}]→(Product{!productId})

这将根据提供的标识符合并获取/创建这两个节点以及它们之间的BOUGHT关系。然后设置所有其他json属性,以便持久化数据将为:(User{userId: 1})-[:BOUGHT{price: 10, currency: '€', shippingAddress.city: 'Venice', shippingAddress.cap: 30100}]→(Product{productId: 100})

(User{!userId})-[:BOUGHT{price}]→(Product{!productId})

这将根据提供的标识符合并获取/创建这两个节点以及它们之间的BOUGHT关系。然后设置所有指定的json属性,以便持久化模式将为:(User{userId: 1})-[:BOUGHT{price: 10}]→(Product{productId: 100})

(User{!userId})-[:BOUGHT{-shippingAddress}]→(Product{!productId})

这将根据提供的标识符合并获取/创建这两个节点以及它们之间的BOUGHT关系。然后设置所有指定的json属性(通过排除),以便持久化模式将为:(User{userId: 1})-[:BOUGHT{price: 10, currency: '€'}]→(Product{productId: 100})

(User{!userId})-[:BOUGHT{price,currency, shippingAddress.city}]→(Product{!productId})

这将根据提供的标识符合并获取/创建这两个节点以及它们之间的BOUGHT关系。然后设置所有指定的json属性,以便持久化模式将为:(User{userId: 1})-[:BOUGHT{price: 10, currency: '€', shippingAddress.city: 'Venice'}]→(Product{productId: 100})

将属性附加到节点

默认情况下,不会将任何属性附加到边缘节点,但您可以指定要附加到每个节点的属性。假设以下通过user主题发布的json

{
    "userId": 1,
    "userName": "Andrea",
    "userSurname": "Santurbano",
    "productId": 100,
    "productName": "My Awesome Product!",
    "price": 10,
    "currency": "€"
}
模式 含义

(User{!userId, userName, userSurname})-[:BOUGHT]→(Product{!productId, productName})

这将合并两个节点以及它们之间的BOUGHT关系,并包含所有json属性,以便持久化模式将为:(User{userId: 1, userName: 'Andrea', userSurname: 'Santurbano'})-[:BOUGHT{price: 10, currency: '€'}]→(Product{productId: 100, name: 'My Awesome Product!'})

CUD文件格式

CUD文件格式是JSON文件,表示图实体(节点/关系)以及如何以Create/Update/Delete操作的方式管理它们。

您可以按以下方式配置主题

streams.sink.topic.cud=<LIST_OF_TOPICS_SEPARATED_BY_SEMICOLON>
streams.sink.topic.cud=my-topic;my-other.topic

我们有两种格式

  • 一种用于节点

    我们提供了一个MERGE操作的示例

    {
      "op": "merge",
      "properties": {
        "foo": "value",
        "key": 1
      },
      "ids": {"key": 1, "otherKey":  "foo"},
      "labels": ["Foo","Bar"],
      "type": "node",
      "detach": true
    }

它将转换为以下Cypher查询

UNWIND [..., {
  "op": "merge",
  "properties": {
    "foo": "value",
    "key": 1
  },
  "ids": {"key": 1, "otherKey":  "foo"},
  "labels": ["Foo","Bar"],
  "type": "node",
  "detach": true
}, ...] AS event
MERGE (n:Foo:Bar {key: event.ids.key, otherkey: event.ids.otherkey})
SET n += event.properties

让我们描述这些字段

表1. CUD文件节点格式字段描述
字段 必填 描述

op

操作类型:create/merge/update/delete

注意:delete消息用于单个节点,它并非旨在成为从JSON构建Cypher查询的通用方法。

properties

如果操作为delete则否,否则是

附加到节点的属性

ids

如果操作为create则否,否则是

如果操作为merge/update/delete,则此字段为必填,并包含将用于查找实体的节点的主键/唯一键。如果您使用_id作为键,则cud格式将引用Neo4j的节点内部ID进行节点查找。

注意:如果您将_id引用与op merge一起使用,它将起作用简单的更新,这意味着如果具有传递的内部ID的节点不存在,则不会创建它。

labels

附加到节点的标签。

注意:Neo4j允许创建没有标签的节点,但从性能角度来看,这是一个不好的主意,不要提供它们。

type

实体类型:node/relationship ⇒ 在本例中为node

detach

如果操作为delete,您可以指定是否执行“detach”删除,这意味着在删除节点时删除任何关联关系

注意:如果未提供值,则默认为true

  • 以及一种用于关系

我们提供了一个CREATE操作的示例

{
  "op": "create",
  "properties": {
    "foo": "rel-value",
    "key": 1
  },
  "rel_type": "MY_REL",
  "from": {
    "ids": {"key": 1},
    "labels": ["Foo","Bar"]
  },
  "to": {
    "ids": {"otherKey":1},
    "labels": ["FooBar"]
  },
  "type":"relationship"
}

它将转换为以下Cypher查询

UNWIND [..., {
  "op": "create",
  "properties": {
    "foo": "rel-value",
    "key": 1
  },
  "rel-type": "MY-REL",
  "from": {
    "ids": {"key": 1},
    "labels": ["Foo","Bar"]
  },
  "to": {
    "ids": {"otherKey":1},
    "labels": ["FooBar"]
  },
  "type":"relationship"
}, ...] AS event
MATCH (from:Foo:Bar {key: event.from.ids.key})
MATCH (to:FooBar {otherKey: event.to.ids.otherKey})
CREATE (from)-[r:MY_REL]->(to)
SET r = event.properties

让我们描述这些字段

表2. CUD文件关系格式字段描述
字段 必填 描述

op

操作类型:create/merge/update/delete

properties

附加到关系的属性

rel_type

关系类型

from

是,如果您在ids中使用_id字段引用,则可以将标签留空

包含有关关系源节点的信息。有关idslabels字段的描述,请参阅上面的节点字段描述

to

是,如果您在ids中使用_id字段引用,则可以将标签留空

包含有关关系目标节点的信息。有关idslabels字段的描述,请参阅上面的节点字段描述

type

实体类型:node/relationship ⇒ 在本例中为relationship

以下是节点和关系的DELETE操作的另一个示例。

  • 对于节点,以下JSON

{
  "op": "delete",
  "properties": {},
  "ids": {"key": 1, "otherKey":  "foo"},
  "labels": ["Foo","Bar"],
  "type": "node",
  "detach": false
}

将转换为以下Cypher查询

UNWIND [..., {
  "op": "delete",
  "properties": {},
  "ids": {"key": 1, "otherKey":  "foo"},
  "labels": ["Foo","Bar"],
  "type": "node",
  "detach": false
}, ...] AS event
MATCH (n:Foo:Bar {key: event.ids.key, otherkey: event.ids.otherkey})
DELETE n

请注意,如果您设置"detach": true,则转换将为

UNWIND [
...
] AS event
...
DETACH DELETE n
  • 对于关系,以下JSON

{
  "op": "create",
  "properties": {},
  "rel_type": "MY_REL",
  "from": {
    "ids": {"key": 1},
    "labels": ["Foo","Bar"]
  },
  "to": {
    "ids": {"otherKey":1},
    "labels": ["FooBar"]
  },
  "type":"relationship"
}

将转换为以下Cypher查询

UNWIND [..., {
  "op": "create",
  "properties": {},
  "rel_type": "MY_REL",
  "from": {
    "ids": {"key": 1},
    "labels": ["Foo","Bar"]
  },
  "to": {
    "ids": {"otherKey":1},
    "labels": ["FooBar"]
  },
  "type":"relationship"
}, ...] AS event
MATCH (from:Foo:Bar {key: event.from.ids.key})
MATCH (to:FooBar {otherkey: event.to.ids.otherkey})
MATCH (from)-[r:MY_REL]->(to)
DELETE r

我们可以在关系创建/合并时创建不存在的节点,在"from"和/或"to"字段中放入"op": "merge"。默认情况下,"op"为match,因此如果节点不存在,则不会创建它。例如,我们可以编写

{
  "op": "create",
  "properties": {},
  "rel_type": "MY_REL",
  "from": {
    "ids": {"key": 1},
    "labels": ["Foo","Bar"],
    "op": "merge"
  },
  "to": {
    "ids": {"otherKey":1},
    "labels": ["FooBar"],
    "op": "merge"
  },
  "type":"relationship"
}

如何处理错误数据

Neo4j Streams插件提供了多种处理处理错误的方法。

它可以快速失败或记录不同详细程度的错误。另一种方法是将所有数据和错误重新路由到死信队列,因为某些原因无法将其摄取。

它默认的行为类似于Kafka Connect,请参阅此博文
  • 默认情况下快速失败(中止)

  • 需要配置死信队列主题以启用

  • 需要显式启用日志记录

  • 必须显式启用报头和消息日志记录

配置选项

表3. 死信队列配置参数
名称 注意

errors.tolerance

none

快速失败(默认!)中止

errors.tolerance

all

all == 宽容,静默忽略错误消息

errors.log.enable

false/true

记录错误(默认:false)

errors.log.include.messages

false/true

也记录错误消息(默认:false)

errors.deadletterqueue.topic.name

topic-name

死信队列主题名称,如果省略则没有DLQ,默认:未设置

errors.deadletterqueue.context.headers.enable

false/true

使用元数据报头(如异常、时间戳、org.topic、org.part)丰富消息,默认:false

errors.deadletterqueue.context.headers.prefix

prefix-text

报头条目的通用前缀,例如"__streams.errors.",默认:未设置

errors.deadletterqueue.topic.replication.factor

3/1

复制因子,需要设置为1以获得单个分区,默认:3

对于Neo4j扩展,您在Neo4j配置中以streams.sink作为前缀。

示例设置

快速失败,中止
errors.tolerance=none
不要在错误时失败,使用消息记录
errors.tolerance=all
errors.log.enable=true
errors.log.include.messages=true
不要在错误时失败,不要记录,但将带报头发送到DLQ
errors.tolerance=all
errors.deadletterqueue.topic.name=my-dlq-topic
errors.deadletterqueue.context.headers.enable=true
Neo4j服务器插件的相同设置
streams.sink.errors.tolerance=all
streams.sink.errors.deadletterqueue.topic.name=my-dlq-topic
streams.sink.errors.deadletterqueue.context.headers.enable=true

死信队列中发布的每个记录都包含原始记录的KeyValue对,以及可选的以下报头

报头键 描述

<prefix>topic

发布数据所在的主题

<prefix>partition

发布数据所在的主题分区

<prefix>soffset

数据在主题分区中的偏移量

<prefix>class.name

生成错误的类

<prefix>exception.class.name

生成错误的异常

<prefix>exception.message

异常消息

<prefix>exception.stacktrace"

异常堆栈跟踪

<prefix>databaseName"

数据库名称

支持的Kafka反序列化器

Neo4j Streams插件支持2个反序列化器

  • org.apache.kafka.common.serialization.ByteArrayDeserializer:如果您想管理JSON消息

  • io.confluent.kafka.serializers.KafkaAvroDeserializer:如果您想管理AVRO消息

您可以根据配置段落中指定的KeyValue分别定义它们

配置摘要

您可以在neo4j.conf中设置以下Kafka配置值,以下是默认值。

neo4j.conf
kafka.bootstrap.servers=localhost:9092
kafka.auto.offset.reset=earliest
kafka.group.id=neo4j
kafka.enable.auto.commit=true
kafka.key.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
kafka.value.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer

streams.sink.topic.cypher.<TOPIC_NAME>=<CYPHER_QUERY>
streams.sink.topic.cdc.sourceId=<LIST_OF_TOPICS_SEPARATED_BY_SEMICOLON>
streams.sink.topic.cdc.schema=<LIST_OF_TOPICS_SEPARATED_BY_SEMICOLON>
streams.sink.topic.cud=<LIST_OF_TOPICS_SEPARATED_BY_SEMICOLON>
streams.sink.topic.pattern.node.<TOPIC_NAME>=<NODE_EXTRACTION_PATTERN>
streams.sink.topic.pattern.relationship.<TOPIC_NAME>=<RELATIONSHIP_EXTRACTION_PATTERN>
streams.sink.enabled=<true/false, default=false>

streams.check.apoc.timeout=<ms to await for APOC being loaded, default -1 skip the wait>
streams.check.apoc.interval=<ms interval awaiting for APOC being loaded, default 1000>
streams.sink.poll.interval=<The delay interval between poll cycles, default 0>

有关这些设置的详细信息,请参阅Apache Kafka文档

如果streams.cluster.only设置为true,则流将在单实例模式下或在备份操作的上下文中拒绝启动。这是确保在生产部署中意外情况下不发生操作的重要安全保护措施。

有关这些设置的详细信息,请参阅Apache Kafka文档

自定义Kafka配置

在本节中,我们将描述特定Neo4j流Kafka配置的含义

kafka.streams.async.commit

如果kafka.enable.auto.commit=false,此属性允许您管理如何将消息提交到主题。

可能的值

  • false(默认)在幕后,我们使用Kafka Consumer的commitSync方法

  • true在幕后,我们使用Kafka Consumer的commitAsync方法

commitSync VS commitAsync

commitSync是同步提交,将在提交成功或遇到不可恢复的错误(在这种情况下,它将抛给调用方)之前阻塞。

这意味着,commitSync是一个具有内部重试机制的阻塞方法,这可能会影响摄取的性能,因为只有在提交结束时才会处理新一批消息。

另一方面,commitAsync是一个异步调用(因此不会阻塞),并且不提供内部重试机制。

权衡:延迟与数据一致性

如果您需要确保数据一致性,请选择commitSync,因为它会确保在执行任何进一步操作之前,您将知道偏移量提交是否成功或失败。但由于它是同步且阻塞的,您将花费更多时间等待提交完成,从而导致高延迟。如果您不介意一定程度的数据不一致并且想要低延迟,请选择commitAsync,因为它不会等待完成。

多数据库支持

Neo4j 4.0 企业版具有多租户支持,为了支持此功能,您可以为每个数据库实例设置一个配置后缀,其模式为to.<DB_NAME>,添加到neo4j.conf文件中的属性中。

以下是允许支持多租户的新属性列表

streams.sink.topic.cypher.<TOPIC_NAME>.to.<DB_NAME>=<CYPHER_QUERY>
streams.sink.topic.cdc.sourceId.to.<DB_NAME>=<LIST_OF_TOPICS_SEPARATE_BY_SEMICOLON>
streams.sink.topic.cdc.schema.to.<DB_NAME>=<LIST_OF_TOPICS_SEPARATE_BY_SEMICOLON>
streams.sink.topic.pattern.node.<TOPIC_NAME>.to.<DB_NAME>=<NODE_EXTRACTION_PATTERN>
streams.sink.topic.pattern.relationship.<TOPIC_NAME>.to.<DB_NAME>=<RELATIONSHIP_EXTRACTION_PATTERN>
streams.sink.enabled.to.<DB_NAME>=<true/false, default=false>

请注意streams.sink.enabled.to.<DB_NAME>属性。默认情况下它为false,因为Sink模块默认情况下也已禁用。如果Sink模块已启用(streams.sink.enabled=true),则它将对所有数据库启用。因此,如果您只想对一个或多个数据库启用/禁用Sink模块,则必须为每个数据库指定streams.sink.enabled.to.<DB_NAME>属性。

这意味着对于每个数据库实例,您可以指定是否

  • 使用源连接器

  • 路由模式

因此,如果您有一个名为foo的实例,则可以这样指定配置

streams.sink.topic.cypher.<TOPIC_NAME>.to.foo=<CYPHER_QUERY>
streams.sink.topic.cdc.sourceId.to.foo=<LIST_OF_TOPICS_SEPARATE_BY_SEMICOLON>
streams.sink.topic.cdc.schema.to.foo=<LIST_OF_TOPICS_SEPARATE_BY_SEMICOLON>
streams.sink.topic.pattern.node.<TOPIC_NAME>.to.foo=<NODE_EXTRACTION_PATTERN>
streams.sink.topic.pattern.relationship.<TOPIC_NAME>.to.foo=<RELATIONSHIP_EXTRACTION_PATTERN>
streams.sink.enabled.to.foo=<true/false, default=false>

旧属性

streams.sink.topic.cypher.<TOPIC_NAME>=<CYPHER_QUERY>
streams.sink.topic.cdc.sourceId=<LIST_OF_TOPICS_SEPARATE_BY_SEMICOLON>
streams.sink.topic.cdc.schema=<LIST_OF_TOPICS_SEPARATE_BY_SEMICOLON>
streams.sink.topic.pattern.node.<TOPIC_NAME>=<NODE_EXTRACTION_PATTERN>
streams.sink.topic.pattern.relationship.<TOPIC_NAME>=<RELATIONSHIP_EXTRACTION_PATTERN>
streams.sink.enabled=<true/false, default=false>

仍然有效,并且它们引用Neo4j的默认数据库实例,该实例通常称为neo4j,但可以通过单独的Neo4j系统配置进行控制。

默认数据库由Neo4j的dbms.default_database配置属性控制,因此我们明确了适用于此用户的默认数据库。数据库名称不区分大小写,并规范化为小写,并且必须遵循Neo4j数据库命名规则。(参考:https://neo4j.ac.cn/docs/operations-manual/current/manage-databases/configuration/#manage-databases-administration)

特别是以下属性将用作非默认数据库实例的默认值,如果未提供特定配置参数。

streams.sink.enabled=<true/false, default=false>

这意味着如果您有3个数据库实例的Neo4j

  • neo4j(默认)

  • foo

  • bar

并且您希望在所有实例上启用Sink插件,您可以简单地省略任何关于启用它的配置,您只需要为每个实例提供路由配置。

streams.sink.topic.cypher.fooTopic.to.foo=MERGE (f:Foo{fooId: event.fooId}) SET c += event.properties
streams.sink.topic.cypher.barTopic.to.bar=MERGE (b:Bar{barId: event.barId}) SET c += event.properties
streams.sink.topic.cypher.barTopic.to.neo4j=MERGE (c:MyLabel{myId: event.myId}) SET c += event.properties

否则,如果您只想在customersproducts实例上启用Sink插件,则可以按以下方式执行

streams.sink.enabled=false
streams.sink.enabled.to.foo=true
streams.sink.enabled.to.bar=true
streams.sink.topic.cypher.fooTopic.to.foo=MERGE (f:Foo{fooId: event.fooId}) SET c += event.properties
streams.sink.topic.cypher.barTopic.to.bar=MERGE (b:Bar{barId: event.barId}) SET c += event.properties

因此,一般来说,如果您有

streams.sink.enabled=true
streams.sink.enabled.to.foo=false

那么除了foo之外,所有数据库上都启用了sink(本地覆盖全局)