查询策略
查询策略允许用户定义自己的 Cypher 查询以提取更改。这需要对架构进行适当的修改,例如通过专门的变更跟踪属性(例如节点或关系上的时间戳)跟踪更改,或使用软删除来跟踪实体的删除。
配置
首先,您需要为连接器实例选择 QUERY 策略;
"neo4j.source-strategy": "QUERY"
其次,您需要定义要跟踪的更改的查询以及要发布更改的位置。
"neo4j.query.topic": "my-topic", (1)
"neo4j.query": "MATCH (ts:TestSource) WHERE ts.timestamp > $lastCheck RETURN ts.name AS name, ts.surname AS surname, ts.timestamp AS timestamp", (2)
"neo4j.query.streaming-property": "timestamp" (3)
1 | 将接收消息的主题名称。 |
2 | 一个 Cypher 查询,它返回自上次迭代以来更改的实体,由 `$lastCheck` 参数发送。 |
3 | 我们用作游标来跟踪更改的属性(字段名称)。这需要是返回结果的一部分。 |
创建源实例
根据上面的示例,您可以使用以下配置之一。选择消息序列化格式示例之一,并将其保存为名为 `source.query.neo4j.json` 的文件到本地目录。
现在,我们将通过调用以下 REST 调用来创建源实例
curl -X POST http://localhost:8083/connectors \
-H "Content-Type:application/json" \
-H "Accept:application/json" \
-d @source.query.neo4j.json
这将创建一个 Kafka Connect 源实例,该实例将通过提供的查询将更改事件消息发送到 `my-topic` 主题,使用您选择的序列化格式。在控制中心中,确认在连接选项卡下(在 connect-default 下)创建了源连接器。
在这种情况下,生成的更改事件消息将具有以下结构
{"name": <name>, "surname": <surname>, "timestamp": <timestamp>}