并行 Cypher 执行

本节描述了用于并行执行 Cypher 语句的过程和函数。

过程和函数概述

下面描述了可用的过程和函数

限定名称 类型 发布

apoc.cypher.parallel

- 通过 paramMap 中以 keyList 为键定义的列表并行执行片段

过程

APOC 扩展

apoc.cypher.parallel2

- 通过 paramMap 中以 keyList 为键定义的列表分批并行执行片段

过程

APOC 扩展

apoc.cypher.mapParallel

apoc.cypher.mapParallel(fragment, params, list-to-parallelize) yield value - 分批并行执行片段,列表片段被分配到 _

过程

已弃用

APOC 扩展

apoc.cypher.mapParallel2

apoc.cypher.mapParallel2(fragment, params, list-to-parallelize) yield value - 分批并行执行片段,列表片段被分配到 _

过程

apoc.cypher.parallel

给定此数据集

UNWIND range(0, 9999) as idx CREATE (:Person {name: toString(idx)})

我们可以使用此过程通过 (:Person) 节点并行执行语句

MATCH (p:Person) WITH collect(p) as people
CALL apoc.cypher.parallel('RETURN a.name + t as title', {a: people, t: ' - suffix'}, 'a')
YIELD value RETURN value.title as title

在上面的查询中,我们将一个 map 作为第二个参数传递,并将前一个 map 中的一个字符串作为第三个参数传递。键为 'a' 的值将是用于并行循环的列表。注意,不需要将 at 作为查询参数(即 $a$t)传递,因为在底层,该过程将在查询中预置它们:WITH $parameterName as parameterName。因此在本例中,WITH $a as a, $t as t

在此示例中,我们并行执行多个查询 WITH $a as a, $t as t RETURN a.name + t as title,其中 a 是包含在 people 列表中的 (:Person) 节点之一。

过程的结果是

表 1. 结果
标题

"0 - 后缀"

"1 - 后缀"

"2 - 后缀"

"3 - 后缀"

"4 - 后缀"

…​

…​

…​

…​

apoc.cypher.parallel2

此过程与 apoc.cypher.parallel2 相似,但在底层工作方式不同(参见下文)。使用先前的数据集,我们可以执行

MATCH (p:Person) WITH collect(p) as people
CALL apoc.cypher.parallel('RETURN a.name + t as title', {a: people, t: $suffix}, 'a')
YIELD value RETURN value.title as title

过程的结果是

表 2. 结果
标题

"0 - 后缀"

"1 - 后缀"

"2 - 后缀"

"3 - 后缀"

"4 - 后缀"

…​

…​

…​

…​

parallel 将要并行处理的集合(在本例中为 people)放入 java.util.parallelStream() 中,然后执行多个如下所示的查询:WITH $a as a, $t as t RETURN a.name + t as title

parallel2 转换示例中,fragment 参数首先将集合 people 分割成大小为 total / partitions 的批次,其中 partitions 是 JVM 可用处理器数量 * 100(如果 total / partitions < 1,则为 1)。然后,它为每个批次创建了一个 java.util.concurrent.Future,每个 Future 执行一个如下所示的查询:WITH $t AS t UNWIND $a AS a RETURN a.name + $t as title(其中 $a 是当前批次的 people)。最后,它计算了 futures。

通常,建议使用 apoc.cypher.parallel2 过程而不是 apoc.cypher.parallel

© . All rights reserved.