查询策略
查询策略允许用户定义自己的 Cypher 查询来提取变更。这需要适当的模式修改,例如通过专用的变更跟踪属性(如节点或关系上的时间戳)来跟踪变更,或使用软删除来跟踪实体删除。
配置
首先,您需要为连接器实例选择查询 (QUERY) 策略;
"neo4j.source-strategy": "QUERY"
其次,您需要定义查询以跟踪变更,并确定发布位置。
"neo4j.query.topic": "my-topic", (1)
"neo4j.query": "MATCH (ts:TestSource) WHERE ts.timestamp > $lastCheck RETURN ts.name AS name, ts.surname AS surname, ts.timestamp AS timestamp", (2)
"neo4j.query.streaming-property": "timestamp" (3)
1 | 将接收消息的主题名称。 |
2 | 一个 Cypher 查询,返回自上次迭代以来已变更的实体,通过 $lastCheck 参数传入。 |
3 | 我们用作游标来跟踪变更的属性(字段名)。这需要作为返回结果的一部分。 |
创建源实例
根据以上示例,您可以使用以下配置之一。选择其中一种消息序列化格式示例,并将其保存为名为 source.query.neo4j.json
的文件到本地目录中。
我们现在将通过调用以下 REST 请求来创建源实例
curl -X POST http://localhost:8083/connectors \
-H "Content-Type:application/json" \
-H "Accept:application/json" \
-d @source.query.neo4j.json
这将创建一个 Kafka Connect 源实例,该实例将使用您偏好的序列化格式,把由提供的查询派生出的变更事件消息发送到 my-topic
主题。在 Control Center 中,请确认在 Connect 选项卡下的 connect-default
中已创建源连接器。
在此情况下,生成的变更事件消息将具有以下结构
{"name": <name>, "surname": <surname>, "timestamp": <timestamp>}