周期性执行
APOC 库包含可用于在执行大型写入操作时进行批量事务处理的过程。
从 Neo4j 5.21 开始,可以使用 Cypher 命令 CALL {…} IN CONCURRENT TRANSACTIONS 并行处理事务。更多信息请参见 事务中的 CALL 子查询 → 并发事务。 |
批量事务函数
限定名称 | 类型 |
---|---|
|
过程 |
|
过程 |
周期性迭代
apoc.periodic.iterate
过程在需要处理大量数据(如导入、重构以及其他需要大型事务的场景)时非常有用。它通过将工作负载分为两部分来提供批量处理数据的方式:
- 数据驱动语句
-
这定义了您如何选择需要处理的数据。您可以提供 Cypher 语句来从现有图数据中选择,从文件或 API 读取外部数据,或从另一个数据存储中检索数据。
- 操作语句
-
这定义了您希望对所选数据执行的操作。您可以执行诸如运行 Cypher 来更新、创建/删除数据,或者运行其他过程来在加载前操作和转换值。
数据驱动语句作为**第一个**语句提供,其结果是一个待处理的值流。操作语句作为**第二个**语句提供,用于一次处理**一个**元素,或(使用 batchMode: "BATCH"
)一次处理一个批次。数据驱动语句的结果会作为参数传递给操作语句,因此它们会以其名称自动可用。
名称 | 类型 | 默认值 | 描述 |
---|---|---|---|
batchSize |
整数 |
10000 |
在单个事务中运行指定数量的操作语句 - 参数: {_count, _batch} |
parallel |
布尔值 |
false |
并行运行操作语句(注意:如果存在冲突,语句可能会死锁) |
retries |
整数 |
0 |
如果操作语句失败并出现错误,则暂停 100 毫秒并重试,直到达到重试次数 - 参数 {_retry} |
batchMode |
字符串 |
"BATCH" |
数据驱动语句应如何由操作语句处理。有效值包括
UNWIND $_batch AS _batch WITH _batch.field1 AS field1, _batch.field2 AS field2
|
params |
映射 |
{} |
外部传入参数映射 |
concurrency |
整数 |
可用处理器数量 |
使用 |
failedParams |
整数 |
-1 |
如果设置为非负值,每个失败的批次(最多 |
在 APOC 4.0.0.11 及更早版本中,
|
参数 | 默认值 | 描述 |
---|---|---|
iterateList |
true |
每批次大小执行一次操作语句(整个 batchSize 列表作为参数 {_batch} 传入)
|
周期性迭代示例
让我们看一些例子。
如果您要为数百万个 :Person
节点添加 :Actor
标签,您可以运行以下代码:
CALL apoc.periodic.iterate(
"MATCH (p:Person) WHERE (p)-[:ACTED_IN]->() RETURN p",
"SET p:Actor",
{batchSize:10000, parallel:true})
让我们分解传递给过程的参数:
-
我们的第一个 Cypher 语句选择所有与另一个节点具有
ACTED_IN
关系并返回这些Person
节点。这是数据驱动部分,我们在此选择要更改的数据。 -
我们的第二个 Cypher 语句为每个选定的
Person
节点设置:Actor
标签。这是操作部分,我们在此将更改应用于第一个语句中的数据。 -
最后,我们指定希望过程使用的任何配置。我们定义了
batchSize
为 10,000,并并行运行这些语句。
执行此过程将获取第一个 Cypher 语句中收集的所有 Person
节点,并使用第二个 Cypher 语句更新每个节点。它将工作分成批次——从流中获取 10,000 个 Person
节点并在单个事务中更新它们。如果我们的图中包含 30,000 个具有 ACTED_IN
关系的 Person
节点,那么它将分成 3 个批次。
最后,它并行运行这些操作,因为更新节点标签或属性不会产生冲突。
对于更复杂的操作,如更新或删除关系,请**不要使用 |
现在我们来看一个更复杂的例子。
CALL apoc.periodic.iterate(
"MATCH (o:Order) WHERE o.date > '2016-10-13' RETURN o",
"MATCH (o)-[:HAS_ITEM]->(i) WITH o, sum(i.value) as value SET o.value = value",
{batchSize:100, parallel:true})
让我们分解传递给过程的参数:
-
我们的第一个 Cypher 语句选择所有订单日期晚于
2016 年 10 月 13 日
的Order
节点(第一个 Cypher 语句)。 -
我们的第二个 Cypher 语句获取这些组,并查找与其它节点具有
HAS_ITEM
关系的节点,然后汇总这些项的值,并将该总和设置为总订单值的属性(o.value
)。 -
我们的配置将把这些节点分成 100 个一组(
batchSize:100
),并并行运行这些批次,供第二个语句处理。
批处理模式: BATCH_SINGLE
如果我们的操作语句调用了一个接受批量值的过程,我们可以使用 batchMode: "BATCH_SINGLE"
来获取批量值并将其传递给该过程。当我们使用 BATCH_SINGLE
时,操作语句将可以访问 $_batch
参数,该参数将包含数据驱动语句中返回的字段列表。
例如,如果数据驱动语句是:
RETURN 'mark' AS a, 'michael' AS b
UNION
RETURN 'jennifer' AS a, 'andrea' AS b
传递给操作语句的 $_batch
变量的内容将是:
[
{a: "mark", b: "michael"},
{a: "jennifer", b: "andrea"}
]
让我们看一个实际的例子。我们首先创建一些节点:
Person
和属性 id
的节点:UNWIND range(1,100000) as id create (:Person {id: id})
我们可以使用 apoc.nodes.delete
过程删除这些节点。请参阅 删除数据。
此过程接受一个节点列表,我们可以从 $_batch
参数中提取该列表。
Person
节点,并以 100 个为一批次删除它们:CALL apoc.periodic.iterate(
"MATCH (p:Person) RETURN p",
// Extract `p` variable using list comprehension
"CALL apoc.nodes.delete([item in $_batch | item.p], size($_batch))",
{batchMode: "BATCH_SINGLE", batchSize: 100}
)
YIELD batch, operations;
在操作语句中使用的 $_batch
参数的内容如下:
[
{p: Node<1>},
{p: Node<2>},
...
]
我们可以使用 列表推导式从列表中的每个项中提取 p
变量。
如果运行此查询,我们将看到以下输出:
批次 | 操作 |
---|---|
{total: 1000, committed: 1000, failed: 0, errors: {}} |
{total: 100000, committed: 100000, failed: 0, errors: {}} |
周期性提交
特别是对于图处理,在单独的事务中重复运行查询直到不再处理并生成任何结果是很有用的。因此,您可以批量迭代不满足条件的元素并更新它们,以便它们之后满足条件。
作为安全网,apoc.periodic.commit 中使用的语句**必须**包含 LIMIT 子句。 |
查询会在单独的事务中重复执行,直到返回 0。
call apoc.periodic.commit(
"match (user:User) WHERE user.city IS NOT NULL
with user limit $limit
MERGE (city:City {name:user.city})
MERGE (user)-[:LIVES_IN]->(city)
REMOVE user.city
RETURN count(*)",
{limit:10000})
更新 | 执行次数 |
---|---|
2000000 |
200 |
进度日志
要可视化 apoc.periodic.iterate
或 apoc.periodic.commit
的详细进度日志,请在 neo4j.conf
文件中将 dbms.logs.debug.level
设置为 DEBUG
。
在以下查询中,dbms.logs.debug.level
已设置为 DEBUG
。
UNWIND range(1,100) AS x CREATE (:TestLog{bar:'TestLog_'+x});
CALL apoc.periodic.iterate('match (p:TestLog) return p', 'SET p.foo =p.bar REMOVE p.bar', {batchSize:10,parallel:true});
返回以下日志:
2020-11-27 09:03:44.279+0000 INFO Starting periodic iterate from `match (p:TestLog) return p` operation using iteration `SET p.foo =p.bar REMOVE p.bar` in separate thread with id: `fc8ff303-bfdd-49f0-a724-603f03b0da45`
2020-11-27 09:03:44.279+0000 DEBUG Execute, in periodic iterate with id fc8ff303-bfdd-49f0-a724-603f03b0da45, no 10 batch size
2020-11-27 09:03:44.280+0000 DEBUG Processed in periodic iterate with id fc8ff303-bfdd-49f0-a724-603f03b0da45, 10 iterations of 10 total
2020-11-27 09:03:44.280+0000 DEBUG Execute, in periodic iterate with id fc8ff303-bfdd-49f0-a724-603f03b0da45, no 10 batch size
2020-11-27 09:03:44.280+0000 DEBUG Processed in periodic iterate with id fc8ff303-bfdd-49f0-a724-603f03b0da45, 10 iterations of 20 total
2020-11-27 09:03:44.280+0000 DEBUG Execute, in periodic iterate with id fc8ff303-bfdd-49f0-a724-603f03b0da45, no 10 batch size
2020-11-27 09:03:44.294+0000 DEBUG Processed in periodic iterate with id fc8ff303-bfdd-49f0-a724-603f03b0da45, 10 iterations of 30 total
2020-11-27 09:03:44.294+0000 DEBUG Execute, in periodic iterate with id fc8ff303-bfdd-49f0-a724-603f03b0da45, no 10 batch size
2020-11-27 09:03:44.295+0000 DEBUG Processed in periodic iterate with id fc8ff303-bfdd-49f0-a724-603f03b0da45, 10 iterations of 40 total
2020-11-27 09:03:44.295+0000 DEBUG Execute, in periodic iterate with id fc8ff303-bfdd-49f0-a724-603f03b0da45, no 10 batch size
2020-11-27 09:03:44.295+0000 DEBUG Processed in periodic iterate with id fc8ff303-bfdd-49f0-a724-603f03b0da45, 10 iterations of 50 total
2020-11-27 09:03:44.297+0000 DEBUG Execute, in periodic iterate with id fc8ff303-bfdd-49f0-a724-603f03b0da45, no 10 batch size
2020-11-27 09:03:44.298+0000 DEBUG Processed in periodic iterate with id fc8ff303-bfdd-49f0-a724-603f03b0da45, 10 iterations of 60 total
2020-11-27 09:03:44.298+0000 DEBUG Execute, in periodic iterate with id fc8ff303-bfdd-49f0-a724-603f03b0da45, no 10 batch size
2020-11-27 09:03:44.298+0000 DEBUG Processed in periodic iterate with id fc8ff303-bfdd-49f0-a724-603f03b0da45, 10 iterations of 70 total
2020-11-27 09:03:44.298+0000 DEBUG Execute, in periodic iterate with id fc8ff303-bfdd-49f0-a724-603f03b0da45, no 10 batch size
2020-11-27 09:03:44.298+0000 DEBUG Processed in periodic iterate with id fc8ff303-bfdd-49f0-a724-603f03b0da45, 10 iterations of 80 total
2020-11-27 09:03:44.298+0000 DEBUG Execute, in periodic iterate with id fc8ff303-bfdd-49f0-a724-603f03b0da45, no 10 batch size
2020-11-27 09:03:44.299+0000 DEBUG Processed in periodic iterate with id fc8ff303-bfdd-49f0-a724-603f03b0da45, 10 iterations of 90 total
2020-11-27 09:03:44.299+0000 DEBUG Execute, in periodic iterate with id fc8ff303-bfdd-49f0-a724-603f03b0da45, no 10 batch size
2020-11-27 09:03:44.300+0000 DEBUG Processed in periodic iterate with id fc8ff303-bfdd-49f0-a724-603f03b0da45, 10 iterations of 100 total
2020-11-27 09:03:44.512+0000 DEBUG Terminated periodic iterate with id fc8ff303-bfdd-49f0-a724-603f03b0da45 with 100 executions