周期性执行

APOC 库包含可用于在执行大型写入操作时进行批量事务处理的过程。

从 Neo4j 5.21 开始,可以使用 Cypher 命令 CALL {…​} IN CONCURRENT TRANSACTIONS 并行处理事务。更多信息请参见 事务中的 CALL 子查询 → 并发事务

批量事务函数

限定名称 类型

apoc.periodic.commit(statement STRING, params MAP<STRING, ANY>) - 在单独的批量事务中运行给定的语句。

过程

apoc.periodic.iterate(cypherIterate STRING, cypherAction STRING, config MAP<STRING, ANY>) - 对第一个语句返回的每个项运行第二个语句。此过程返回批次数和已处理的总行数。

过程

周期性迭代

apoc.periodic.iterate 过程在需要处理大量数据(如导入、重构以及其他需要大型事务的场景)时非常有用。它通过将工作负载分为两部分来提供批量处理数据的方式:

数据驱动语句

这定义了您如何选择需要处理的数据。您可以提供 Cypher 语句来从现有图数据中选择,从文件或 API 读取外部数据,或从另一个数据存储中检索数据。

操作语句

这定义了您希望对所选数据执行的操作。您可以执行诸如运行 Cypher 来更新、创建/删除数据,或者运行其他过程来在加载前操作和转换值。

数据驱动语句作为**第一个**语句提供,其结果是一个待处理的值流。操作语句作为**第二个**语句提供,用于一次处理**一个**元素,或(使用 batchMode: "BATCH")一次处理一个批次。数据驱动语句的结果会作为参数传递给操作语句,因此它们会以其名称自动可用。

表 1. 配置
名称 类型 默认值 描述

batchSize

整数

10000

在单个事务中运行指定数量的操作语句 - 参数: {_count, _batch}

parallel

布尔值

false

并行运行操作语句(注意:如果存在冲突,语句可能会死锁)
请注意,在 parallel: false 的情况下,APOC 旨在重用同一个 java.util.concurrent.ThreadPoolExecutor,最大池大小为 1,以防止并行执行;这意味着如果您想执行多个 apoc.periodic.iterate,每个都将在前一个完成后执行。相反,使用 parallel: true 时,APOC 将使用一个 ThreadPoolExecutor,其最大池大小可通过 apoc.jobs.pool.num_threads 配置进行配置,或默认为可用处理器数量 * 2。因此,如果我们执行多个 apoc.periodic.iterate,如果队列池大小可以接受新任务,每个都将并行执行。此外,需要注意的是,并行运行会影响所有数据库,而不仅仅是您正在使用的单个数据库。因此,例如,如果有两个数据库 db1db2,如果在 db2 上执行 apoc.periodic.iteratedb1 上的 apoc.periodic.iterate 将会影响性能。

retries

整数

0

如果操作语句失败并出现错误,则暂停 100 毫秒并重试,直到达到重试次数 - 参数 {_retry}

batchMode

字符串

"BATCH"

数据驱动语句应如何由操作语句处理。有效值包括

  • "BATCH" - 每批次大小执行一次操作语句。操作语句以以下内容为前缀,从 $_batch 参数中提取数据驱动语句中返回的每个字段

UNWIND $_batch AS _batch
WITH _batch.field1 AS field1, _batch.field2 AS field2
  • "SINGLE" - 每次执行一个操作语句

  • "BATCH_SINGLE" - 每批次大小执行一次操作语句,但批次解包留给操作语句。操作查询可以通过 $_batch 参数访问批量值。

params

映射

{}

外部传入参数映射

concurrency

整数

可用处理器数量

使用 parallel:true 时生成的并发任务数

failedParams

整数

-1

如果设置为非负值,每个失败的批次(最多 failedParams 参数集)将通过 yield failedParams 返回。

在 APOC 4.0.0.11 及更早版本中,iterateList 配置键用于控制数据驱动语句返回值的批量处理。此功能在 4.0.0.12 版本中被 batchMode 替换。这些配置键的处理方式如下:

  • 如果提供了 batchMode,其值优先于 iterateList

  • 如果未提供 batchMode 而提供了 iterateList,则 iterateList 的值将按以下表格所述进行转换。

  • 如果既未提供 batchMode 也未提供 iterateList,则 batchMode 默认为 BATCH,这与 iterateList:true 相同。

表 2. 弃用配置
参数 默认值 描述

iterateList

true

每批次大小执行一次操作语句(整个 batchSize 列表作为参数 {_batch} 传入)

  • 值为 true 等同于 batchMode: "BATCH"

  • 值为 false 等同于 batchMode: "SINGLE"

周期性迭代示例

让我们看一些例子。

如果您要为数百万个 :Person 节点添加 :Actor 标签,您可以运行以下代码:

CALL apoc.periodic.iterate(
  "MATCH (p:Person) WHERE (p)-[:ACTED_IN]->() RETURN p",
  "SET p:Actor",
  {batchSize:10000, parallel:true})

让我们分解传递给过程的参数:

  • 我们的第一个 Cypher 语句选择所有与另一个节点具有 ACTED_IN 关系并返回这些 Person 节点。这是数据驱动部分,我们在此选择要更改的数据。

  • 我们的第二个 Cypher 语句为每个选定的 Person 节点设置 :Actor 标签。这是操作部分,我们在此将更改应用于第一个语句中的数据。

  • 最后,我们指定希望过程使用的任何配置。我们定义了 batchSize 为 10,000,并并行运行这些语句。

执行此过程将获取第一个 Cypher 语句中收集的所有 Person 节点,并使用第二个 Cypher 语句更新每个节点。它将工作分成批次——从流中获取 10,000 个 Person 节点并在单个事务中更新它们。如果我们的图中包含 30,000 个具有 ACTED_IN 关系的 Person 节点,那么它将分成 3 个批次。

最后,它并行运行这些操作,因为更新节点标签或属性不会产生冲突。

对于更复杂的操作,如更新或删除关系,请**不要使用 parallel: true**,或者确保您以一次操作更新每个数据子图的方式批量处理工作,例如通过传输根对象。如果您尝试复杂操作,还要启用重试失败的操作,例如使用 retries:3

现在我们来看一个更复杂的例子。

CALL apoc.periodic.iterate(
  "MATCH (o:Order) WHERE o.date > '2016-10-13' RETURN o",
  "MATCH (o)-[:HAS_ITEM]->(i) WITH o, sum(i.value) as value SET o.value = value",
  {batchSize:100, parallel:true})

让我们分解传递给过程的参数:

  • 我们的第一个 Cypher 语句选择所有订单日期晚于 2016 年 10 月 13 日Order 节点(第一个 Cypher 语句)。

  • 我们的第二个 Cypher 语句获取这些组,并查找与其它节点具有 HAS_ITEM 关系的节点,然后汇总这些项的值,并将该总和设置为总订单值的属性(o.value)。

  • 我们的配置将把这些节点分成 100 个一组(batchSize:100),并并行运行这些批次,供第二个语句处理。

批处理模式: BATCH_SINGLE

如果我们的操作语句调用了一个接受批量值的过程,我们可以使用 batchMode: "BATCH_SINGLE" 来获取批量值并将其传递给该过程。当我们使用 BATCH_SINGLE 时,操作语句将可以访问 $_batch 参数,该参数将包含数据驱动语句中返回的字段列表。

例如,如果数据驱动语句是:

RETURN 'mark' AS a, 'michael' AS b
UNION
RETURN 'jennifer' AS a, 'andrea' AS b

传递给操作语句的 $_batch 变量的内容将是:

[
  {a: "mark", b: "michael"},
  {a: "jennifer", b: "andrea"}
]

让我们看一个实际的例子。我们首先创建一些节点:

以下查询创建 100,000 个带有标签 Person 和属性 id 的节点:
UNWIND range(1,100000) as id create (:Person {id: id})

我们可以使用 apoc.nodes.delete 过程删除这些节点。请参阅 删除数据

此过程接受一个节点列表,我们可以从 $_batch 参数中提取该列表。

以下查询流式传输所有 Person 节点,并以 100 个为一批次删除它们:
CALL apoc.periodic.iterate(
  "MATCH (p:Person) RETURN p",
  // Extract `p` variable using list comprehension
  "CALL apoc.nodes.delete([item in $_batch | item.p], size($_batch))",
  {batchMode: "BATCH_SINGLE", batchSize: 100}
)
YIELD batch, operations;

在操作语句中使用的 $_batch 参数的内容如下:

[
  {p: Node<1>},
  {p: Node<2>},
  ...
]

我们可以使用 列表推导式从列表中的每个项中提取 p 变量。

如果运行此查询,我们将看到以下输出:

表 3. 结果
批次 操作

{total: 1000, committed: 1000, failed: 0, errors: {}}

{total: 100000, committed: 100000, failed: 0, errors: {}}

周期性提交

特别是对于图处理,在单独的事务中重复运行查询直到不再处理并生成任何结果是很有用的。因此,您可以批量迭代不满足条件的元素并更新它们,以便它们之后满足条件。

作为安全网,apoc.periodic.commit 中使用的语句**必须**包含 LIMIT 子句。

查询会在单独的事务中重复执行,直到返回 0。

call apoc.periodic.commit(
  "match (user:User) WHERE user.city IS NOT NULL
   with user limit $limit
   MERGE (city:City {name:user.city})
   MERGE (user)-[:LIVES_IN]->(city)
   REMOVE user.city
   RETURN count(*)",
  {limit:10000})
表 4. 结果
更新 执行次数

2000000

200

进度日志

要可视化 apoc.periodic.iterateapoc.periodic.commit 的详细进度日志,请在 neo4j.conf 文件中将 dbms.logs.debug.level 设置为 DEBUG

在以下查询中,dbms.logs.debug.level 已设置为 DEBUG

UNWIND range(1,100) AS x CREATE (:TestLog{bar:'TestLog_'+x});
CALL apoc.periodic.iterate('match (p:TestLog) return p', 'SET p.foo =p.bar REMOVE p.bar', {batchSize:10,parallel:true});

返回以下日志:

2020-11-27 09:03:44.279+0000 INFO  Starting periodic iterate from `match (p:TestLog) return p` operation using iteration `SET p.foo =p.bar REMOVE p.bar` in separate thread with id: `fc8ff303-bfdd-49f0-a724-603f03b0da45`
2020-11-27 09:03:44.279+0000 DEBUG Execute, in periodic iterate with id fc8ff303-bfdd-49f0-a724-603f03b0da45, no 10 batch size
2020-11-27 09:03:44.280+0000 DEBUG Processed in periodic iterate with id fc8ff303-bfdd-49f0-a724-603f03b0da45, 10 iterations of 10 total
2020-11-27 09:03:44.280+0000 DEBUG Execute, in periodic iterate with id fc8ff303-bfdd-49f0-a724-603f03b0da45, no 10 batch size
2020-11-27 09:03:44.280+0000 DEBUG Processed in periodic iterate with id fc8ff303-bfdd-49f0-a724-603f03b0da45, 10 iterations of 20 total
2020-11-27 09:03:44.280+0000 DEBUG Execute, in periodic iterate with id fc8ff303-bfdd-49f0-a724-603f03b0da45, no 10 batch size
2020-11-27 09:03:44.294+0000 DEBUG Processed in periodic iterate with id fc8ff303-bfdd-49f0-a724-603f03b0da45, 10 iterations of 30 total
2020-11-27 09:03:44.294+0000 DEBUG Execute, in periodic iterate with id fc8ff303-bfdd-49f0-a724-603f03b0da45, no 10 batch size
2020-11-27 09:03:44.295+0000 DEBUG Processed in periodic iterate with id fc8ff303-bfdd-49f0-a724-603f03b0da45, 10 iterations of 40 total
2020-11-27 09:03:44.295+0000 DEBUG Execute, in periodic iterate with id fc8ff303-bfdd-49f0-a724-603f03b0da45, no 10 batch size
2020-11-27 09:03:44.295+0000 DEBUG Processed in periodic iterate with id fc8ff303-bfdd-49f0-a724-603f03b0da45, 10 iterations of 50 total
2020-11-27 09:03:44.297+0000 DEBUG Execute, in periodic iterate with id fc8ff303-bfdd-49f0-a724-603f03b0da45, no 10 batch size
2020-11-27 09:03:44.298+0000 DEBUG Processed in periodic iterate with id fc8ff303-bfdd-49f0-a724-603f03b0da45, 10 iterations of 60 total
2020-11-27 09:03:44.298+0000 DEBUG Execute, in periodic iterate with id fc8ff303-bfdd-49f0-a724-603f03b0da45, no 10 batch size
2020-11-27 09:03:44.298+0000 DEBUG Processed in periodic iterate with id fc8ff303-bfdd-49f0-a724-603f03b0da45, 10 iterations of 70 total
2020-11-27 09:03:44.298+0000 DEBUG Execute, in periodic iterate with id fc8ff303-bfdd-49f0-a724-603f03b0da45, no 10 batch size
2020-11-27 09:03:44.298+0000 DEBUG Processed in periodic iterate with id fc8ff303-bfdd-49f0-a724-603f03b0da45, 10 iterations of 80 total
2020-11-27 09:03:44.298+0000 DEBUG Execute, in periodic iterate with id fc8ff303-bfdd-49f0-a724-603f03b0da45, no 10 batch size
2020-11-27 09:03:44.299+0000 DEBUG Processed in periodic iterate with id fc8ff303-bfdd-49f0-a724-603f03b0da45, 10 iterations of 90 total
2020-11-27 09:03:44.299+0000 DEBUG Execute, in periodic iterate with id fc8ff303-bfdd-49f0-a724-603f03b0da45, no 10 batch size
2020-11-27 09:03:44.300+0000 DEBUG Processed in periodic iterate with id fc8ff303-bfdd-49f0-a724-603f03b0da45, 10 iterations of 100 total
2020-11-27 09:03:44.512+0000 DEBUG Terminated periodic iterate with id fc8ff303-bfdd-49f0-a724-603f03b0da45 with 100 executions