查询策略

查询策略允许用户定义自己的 Cypher 查询来提取变更。这需要适当的模式修改,例如通过专用的变更跟踪属性(如节点或关系上的时间戳)来跟踪变更,或使用软删除来跟踪实体删除。

配置

首先,您需要为连接器实例选择查询 (QUERY) 策略;

"neo4j.source-strategy": "QUERY"

其次,您需要定义查询以跟踪变更,并确定发布位置。

"neo4j.query.topic": "my-topic", (1)
"neo4j.query": "MATCH (ts:TestSource) WHERE ts.timestamp > $lastCheck RETURN ts.name AS name, ts.surname AS surname, ts.timestamp AS timestamp", (2)
"neo4j.query.streaming-property": "timestamp" (3)
1 将接收消息的主题名称。
2 一个 Cypher 查询,返回自上次迭代以来已变更的实体,通过 $lastCheck 参数传入。
3 我们用作游标来跟踪变更的属性(字段名)。这需要作为返回结果的一部分。

创建源实例

根据以上示例,您可以使用以下配置之一。选择其中一种消息序列化格式示例,并将其保存为名为 source.query.neo4j.json 的文件到本地目录中。

{
  "name": "Neo4jSourceConnectorAVRO",
  "config": {
    "connector.class": "org.neo4j.connectors.kafka.source.Neo4jConnector",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "neo4j.uri": "neo4j://neo4j:7687",
    "neo4j.authentication.type": "BASIC",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "password",
    "neo4j.source-strategy": "QUERY",
    "neo4j.start-from": "NOW",
    "neo4j.query": "MATCH (ts:TestSource) WHERE ts.timestamp > $lastCheck RETURN ts.name AS name, ts.surname AS surname, ts.timestamp AS timestamp",
    "neo4j.query.streaming-property": "timestamp",
    "neo4j.query.topic": "test-source",
    "neo4j.query.polling-interval": "1s",
    "neo4j.query.polling-duration": "5s"
  }
}
{
  "name": "Neo4jSourceConnectorJSONSchema",
  "config": {
    "connector.class": "org.neo4j.connectors.kafka.source.Neo4jConnector",
    "key.converter": "io.confluent.connect.json.JsonSchemaConverter",
    "key.converter.schemas.enable": true,
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter": "io.confluent.connect.json.JsonSchemaConverter",
    "value.converter.schemas.enable": true,
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "neo4j.uri": "neo4j://neo4j:7687",
    "neo4j.authentication.type": "BASIC",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "password",
    "neo4j.source-strategy": "QUERY",
    "neo4j.start-from": "NOW",
    "neo4j.query": "MATCH (ts:TestSource) WHERE ts.timestamp > $lastCheck RETURN ts.name AS name, ts.surname AS surname, ts.timestamp AS timestamp",
    "neo4j.query.streaming-property": "timestamp",
    "neo4j.query.topic": "test-source",
    "neo4j.query.polling-interval": "1s",
    "neo4j.query.polling-duration": "5s"
  }
}
{
  "name": "Neo4jSourceConnectorJSONString",
  "config": {
    "connector.class": "org.neo4j.connectors.kafka.source.Neo4jConnector",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "key.converter.schemas.enable": false,
    "value.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter.schemas.enable": false,
    "neo4j.uri": "neo4j://neo4j:7687",
    "neo4j.authentication.type": "BASIC",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "password",
    "neo4j.source-strategy": "QUERY",
    "neo4j.start-from": "NOW",
    "neo4j.query": "MATCH (ts:TestSource) WHERE ts.timestamp > $lastCheck RETURN ts.name AS name, ts.surname AS surname, ts.timestamp AS timestamp",
    "neo4j.query.streaming-property": "timestamp",
    "neo4j.query.topic": "test-source",
    "neo4j.query.polling-interval": "1s",
    "neo4j.query.polling-duration": "5s"
  }
}
{
  "name": "Neo4jSourceConnectorProtobuf",
  "config": {
    "connector.class": "org.neo4j.connectors.kafka.source.Neo4jConnector",
    "key.converter": "io.confluent.connect.protobuf.ProtobufConverter",
    "key.converter.schemas.enable": true,
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "key.converter.optional.for.nullables": true,
    "value.converter": "io.confluent.connect.protobuf.ProtobufConverter",
    "value.converter.schemas.enable": true,
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter.optional.for.nullables": true,
    "neo4j.uri": "neo4j://neo4j:7687",
    "neo4j.authentication.type": "BASIC",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "password",
    "neo4j.source-strategy": "QUERY",
    "neo4j.start-from": "NOW",
    "neo4j.query": "MATCH (ts:TestSource) WHERE ts.timestamp > $lastCheck RETURN ts.name AS name, ts.surname AS surname, ts.timestamp AS timestamp",
    "neo4j.query.streaming-property": "timestamp",
    "neo4j.query.topic": "test-source",
    "neo4j.query.polling-interval": "1s",
    "neo4j.query.polling-duration": "5s"
  }
}

我们现在将通过调用以下 REST 请求来创建源实例

curl -X POST http://localhost:8083/connectors \
  -H "Content-Type:application/json" \
  -H "Accept:application/json" \
  -d @source.query.neo4j.json

这将创建一个 Kafka Connect 源实例,该实例将使用您偏好的序列化格式,把由提供的查询派生出的变更事件消息发送到 my-topic 主题。在 Control Center 中,请确认在 Connect 选项卡下的 connect-default 中已创建源连接器。

在此情况下,生成的变更事件消息将具有以下结构

{"name": <name>, "surname": <surname>, "timestamp": <timestamp>}
© . All rights reserved.