查询策略

查询策略允许用户定义自己的 Cypher 查询以提取更改。这需要对架构进行适当的修改,例如通过专门的变更跟踪属性(例如节点或关系上的时间戳)跟踪更改,或使用软删除来跟踪实体的删除。

配置

首先,您需要为连接器实例选择 QUERY 策略;

"neo4j.source-strategy": "QUERY"

其次,您需要定义要跟踪的更改的查询以及要发布更改的位置。

"neo4j.query.topic": "my-topic", (1)
"neo4j.query": "MATCH (ts:TestSource) WHERE ts.timestamp > $lastCheck RETURN ts.name AS name, ts.surname AS surname, ts.timestamp AS timestamp", (2)
"neo4j.query.streaming-property": "timestamp" (3)
1 将接收消息的主题名称。
2 一个 Cypher 查询,它返回自上次迭代以来更改的实体,由 `$lastCheck` 参数发送。
3 我们用作游标来跟踪更改的属性(字段名称)。这需要是返回结果的一部分。

创建源实例

根据上面的示例,您可以使用以下配置之一。选择消息序列化格式示例之一,并将其保存为名为 `source.query.neo4j.json` 的文件到本地目录。

{
  "name": "Neo4jSourceConnectorAVRO",
  "config": {
    "connector.class": "org.neo4j.connectors.kafka.source.Neo4jConnector",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "neo4j.uri": "neo4j://neo4j:7687",
    "neo4j.authentication.type": "BASIC",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "password",
    "neo4j.source-strategy": "QUERY",
    "neo4j.start-from": "NOW",
    "neo4j.query": "MATCH (ts:TestSource) WHERE ts.timestamp > $lastCheck RETURN ts.name AS name, ts.surname AS surname, ts.timestamp AS timestamp",
    "neo4j.query.streaming-property": "timestamp",
    "neo4j.query.topic": "test-source",
    "neo4j.query.polling-interval": "1s",
    "neo4j.query.polling-duration": "5s"
  }
}
{
  "name": "Neo4jSourceConnectorJSONSchema",
  "config": {
    "connector.class": "org.neo4j.connectors.kafka.source.Neo4jConnector",
    "key.converter": "io.confluent.connect.json.JsonSchemaConverter",
    "key.converter.schemas.enable": true,
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter": "io.confluent.connect.json.JsonSchemaConverter",
    "value.converter.schemas.enable": true,
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "neo4j.uri": "neo4j://neo4j:7687",
    "neo4j.authentication.type": "BASIC",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "password",
    "neo4j.source-strategy": "QUERY",
    "neo4j.start-from": "NOW",
    "neo4j.query": "MATCH (ts:TestSource) WHERE ts.timestamp > $lastCheck RETURN ts.name AS name, ts.surname AS surname, ts.timestamp AS timestamp",
    "neo4j.query.streaming-property": "timestamp",
    "neo4j.query.topic": "test-source",
    "neo4j.query.polling-interval": "1s",
    "neo4j.query.polling-duration": "5s"
  }
}
{
  "name": "Neo4jSourceConnectorJSONString",
  "config": {
    "connector.class": "org.neo4j.connectors.kafka.source.Neo4jConnector",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "key.converter.schemas.enable": false,
    "value.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter.schemas.enable": false,
    "neo4j.uri": "neo4j://neo4j:7687",
    "neo4j.authentication.type": "BASIC",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "password",
    "neo4j.source-strategy": "QUERY",
    "neo4j.start-from": "NOW",
    "neo4j.query": "MATCH (ts:TestSource) WHERE ts.timestamp > $lastCheck RETURN ts.name AS name, ts.surname AS surname, ts.timestamp AS timestamp",
    "neo4j.query.streaming-property": "timestamp",
    "neo4j.query.topic": "test-source",
    "neo4j.query.polling-interval": "1s",
    "neo4j.query.polling-duration": "5s"
  }
}
{
  "name": "Neo4jSourceConnectorProtobuf",
  "config": {
    "connector.class": "org.neo4j.connectors.kafka.source.Neo4jConnector",
    "key.converter": "io.confluent.connect.protobuf.ProtobufConverter",
    "key.converter.schemas.enable": true,
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "key.converter.optional.for.nullables": true,
    "value.converter": "io.confluent.connect.protobuf.ProtobufConverter",
    "value.converter.schemas.enable": true,
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter.optional.for.nullables": true,
    "neo4j.uri": "neo4j://neo4j:7687",
    "neo4j.authentication.type": "BASIC",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "password",
    "neo4j.source-strategy": "QUERY",
    "neo4j.start-from": "NOW",
    "neo4j.query": "MATCH (ts:TestSource) WHERE ts.timestamp > $lastCheck RETURN ts.name AS name, ts.surname AS surname, ts.timestamp AS timestamp",
    "neo4j.query.streaming-property": "timestamp",
    "neo4j.query.topic": "test-source",
    "neo4j.query.polling-interval": "1s",
    "neo4j.query.polling-duration": "5s"
  }
}

现在,我们将通过调用以下 REST 调用来创建源实例

curl -X POST http://localhost:8083/connectors \
  -H "Content-Type:application/json" \
  -H "Accept:application/json" \
  -d @source.query.neo4j.json

这将创建一个 Kafka Connect 源实例,该实例将通过提供的查询将更改事件消息发送到 `my-topic` 主题,使用您选择的序列化格式。在控制中心中,确认在连接选项卡下(在 connect-default 下)创建了源连接器。

在这种情况下,生成的更改事件消息将具有以下结构

{"name": <name>, "surname": <surname>, "timestamp": <timestamp>}