使用 CDC 查询更改(过程)

启用 CDC 后,您可以使用许多 Cypher 过程来查询更改。

CDC 使用 *更改标识符*,它本质上是指向特定事务的指针。CDC 过程允许您检索特定的更改标识符或查询影响数据库 *在* 给定更改标识符 *之后* 的更改。

**更改标识符是为其生成的数据库所独有的**,不能用于查询另一个数据库上的更改。在从备份恢复数据库时请记住这一点,因为恢复的数据库实际上是一个新的数据库。有关详细信息,请参阅 从备份恢复

检索最早的更改标识符

过程 db.cdc.earliest 返回最早可用更改的更改标识符。

示例 1. 查询最早的更改标识符
查询
CALL db.cdc.earliest()
表 1. 结果
id

"A3V16ZaLlUmnipHLFkWrlA0AAAAAAAAABQAAAAAAAAAA"

检索当前更改标识符

过程 db.cdc.current 返回最后一个已提交事务的更改标识符。

请注意,**返回的标识符是排他的**:它不包括其指向的事务中发生的更改。

示例 2. 查询当前更改标识符
查询
CALL db.cdc.current()
表 2. 结果
id

"A3V16ZaLlUmnipHLFkWrlA0AAAAAAAAABQAAAAAAAAAA"

查询更改

过程 db.cdc.query 返回 *在* 给定更改标识符 *之后* 发生在数据库上的更改。

db.cdc.query(
    from =  :: STRING?, (1)
    selectors = [] :: LIST? OF MAP? (2)
) :: (
    id :: STRING?, (3)
    txId :: INTEGER?, (4)
    seq :: INTEGER?, (5)
    metadata :: MAP?, (6)
    event :: MAP? (7)
)
1 要从中查询更改的更改标识符,可以从对 db.cdc.query 的早期调用中捕获,也可以从 db.cdc.currentdb.cdc.earliest 中捕获一个。默认值为 ""(空字符串),它隐式地替换为 db.cdc.current。**此值被视为排他的**,因此查询的结果不包括与此更改标识符对应的事务中发生的更改。
2 一个可选的 选择器 列表,用于筛选更改。默认为空列表,这意味着返回 *所有* 更改,不进行任何筛选。
3 与每个更改记录关联的唯一更改标识符。每个都可以用于进一步的 db.cdc.query 调用。
4 一个数字,标识更改发生在哪个事务中,与 seq 结合使用时是唯一的。事务标识符不是连续的(某些事务,例如系统和模式命令,不会记录在更改数据捕获中,并导致事务标识符中出现间隙)。
5 一个用于对同一事务中发生的更改进行排序的数字。请注意,输出中观察到的更改顺序不一定对应于事务期间应用更改的顺序。
6 包含事务元数据的键值对。对于单个事务中的所有更改,此值都是相同的。有关元数据字段的详细说明,请参阅 更改事件的格式
7 受影响实体上的检索到的更改。有关更改事件的详细说明,请参阅 更改事件的格式
示例 3. 查询在创建 :Person 节点后的更改
查询
CALL db.cdc.query("A3V16ZaLlUmnipHLFkWrlA0AAAAAAAAABAAAAAAAAAAA")
表 3. 结果
id txId seq metadata event

"A3V16ZaLlUmnipHLFkWrlA0AAAAAAAAABQAAAAAAAAAA"

4

0

{
  "txStartTime": "2024-04-03T06:28:19.630000000Z",
  "databaseName": "neo4j",
  "executingUser": "neo4j",
  "authenticatedUser": "neo4j",
  "connectionServer": "172.17.0.2:7687",
  "connectionType": "bolt",
  "serverId": "20668765",
  "captureMode": "FULL",
  "connectionClient": "172.17.0.1:41888",
  "txCommitTime": "2024-04-03T06:28:19.651000000Z",
  "txMetadata": {
    "app": "neo4j-browser_v5.15.0",
    "type": "user-direct"
  }
}
{
  "elementId": "4:68262997-88e3-4518-83ec-d944674609f4:8",
  "operation": "c",
  "keys": {

  },
  "labels": [
    "Person"
  ],
  "state": {
    "after": {
      "labels": [
        "Person"
      ],
      "properties": {
        "name": "Stefano"
      }
    },
    "before": null
  },
  "eventType": "n"
}

检索最早的更改事件

过程 db.cdc.earliest 返回最早可用更改的更改 *标识符*,而不是关联的更改 *事件* 信息。要检索最早的更改事件,您可以将最早的更改 ID 馈送到 db.cdc.query 并将结果限制为一个。更改事件已按时间顺序排序,因此不需要排序。

CALL db.cdc.earliest() YIELD id AS earliestId
CALL db.cdc.query(earliestId) YIELD event, metadata
RETURN event, metadata LIMIT 1

检索最新的更改事件

要检索最新的更改事件,您可以将最早的更改 ID 馈送到 db.cdc.query,反向排序它们,并将结果限制为一个。

CALL db.cdc.earliest() YIELD id AS earliestId
CALL db.cdc.query(earliestId) YIELD event, metadata
RETURN event, metadata ORDER BY metadata.txCommitTime DESC LIMIT 1
此查询可能非常慢,因为它需要扫描所有更改事件。

一个最小的工作示例

**查看 CDC 运行情况** 的最简单方法是 检索当前更改标识符,在数据库中创建一个新节点,然后使用先前的更改标识符 查询更改。操作必须在三个单独的事务中进行。

事务 1 - 检索当前更改 ID
CALL db.cdc.current() YIELD id AS currentId
// currentId = 'BaQswf9NV0b7qicwDdh7vfwAAAAAAAAD83__________AAABjzfm4bM'
事务 2 - 创建新的 Person 节点
CREATE (:Person:Hero {name: 'Batman', color: 'black'})
事务 3 - 检索 Person 节点创建的更改事件
WITH 'BaQswf9NV0b7qicwDdh7vfwAAAAAAAAD83__________AAABjzfm4bM=' AS previousId
CALL db.cdc.query(previousId) YIELD event, metadata RETURN event, metadata
表 4. 结果
event metadata
{
  "elementId": "4:a42cc1ff-4d57-46fb-aa27-300dd87bbdfc:55135",
  "keys": {}, "state": {
    "before": NULL,
    "after": {
      "properties": {
        "name": "Batman",
        "color": "black"
      },
      "labels": ["Person", "Hero"]
    }
  },
  "eventType": "n",
  "operation": "c",
  "labels": ["Person", "Hero"]
}
{
  "txMetadata": {
    "app": "neo4j-browser_v5.15.0",
    "type": "user-direct"
  },
  "executingUser": "neo4j",
  "databaseName": "cdc",
  "connectionClient": "127.0.0.1:48008",
  "authenticatedUser": "neo4j",
  "captureMode": "DIFF",
  "connectionServer": "127.0.0.1:7687",
  "connectionType": "bolt",
  "serverId": "20668765",
  "txStartTime": 2024-05-02T06:13:08.521Z,
  "txCommitTime": 2024-05-02T06:13:08.551Z
}