并行 Cypher 执行

本节描述用于并行执行 Cypher 语句的过程和函数。

过程和函数概述

下面描述了可用的过程和函数

限定名称 类型 版本

apoc.cypher.parallel

- 通过在 paramMap 中定义的列表(使用键 keyList)并行执行片段

过程

Apoc 扩展

apoc.cypher.parallel2

- 通过在 paramMap 中定义的列表(使用键 keyList)并行批处理执行片段

过程

Apoc 扩展

apoc.cypher.mapParallel

apoc.cypher.mapParallel(fragment, params, list-to-parallelize) yield value - 使用列表段分配给_并行批处理执行片段

过程

Apoc 扩展

apoc.cypher.mapParallel2

apoc.cypher.mapParallel2(fragment, params, list-to-parallelize) yield value - 使用列表段分配给_并行批处理执行片段

过程

Apoc 扩展

apoc.cypher.parallel

给定此数据集

UNWIND range(0, 9999) as idx CREATE (:Person {name: toString(idx)})

我们可以通过此过程通过 (:Person) 节点执行并行语句

MATCH (p:Person) WITH collect(p) as people
CALL apoc.cypher.parallel('RETURN a.name + t as title', {a: people, t: ' - suffix'}, 'a')
YIELD value RETURN value.title as title

在上面的查询中,我们传递了一个映射作为第二个参数,以及来自先前映射的字符串作为第三个参数。键为 'a' 的值将是并行循环的列表。请注意,不需要将at 作为查询参数传递(即$a$t),因为在底层,过程会在查询中添加它们,使用 WITH $parameterName as parameterName。因此,在本例中,WITH $a as a, $t as t

在这个例子中,我们并行执行多个查询WITH $a as a, $t as t RETURN a.name + t as title,其中apeople 列表中包含的 (:Person) 节点之一。

过程的结果是

表 1. 结果
title

"0 - suffix"

"1 - suffix"

"2 - suffix"

"3 - suffix"

"4 - suffix"

…​

…​

…​

…​

apoc.cypher.parallel2

此过程类似于apoc.cypher.parallel2,但底层工作方式不同(见下文)。使用之前的数据集,我们可以执行

MATCH (p:Person) WITH collect(p) as people
CALL apoc.cypher.parallel('RETURN a.name + t as title', {a: people, t: $suffix}, 'a')
YIELD value RETURN value.title as title

过程的结果是

表 2. 结果
title

"0 - suffix"

"1 - suffix"

"2 - suffix"

"3 - suffix"

"4 - suffix"

…​

…​

…​

…​

parallel 将要并行化的集合(在本例中为people)放入java.util.parallelStream() 中,然后执行多个类似这样的查询:WITH $a as a, $t as t RETURN a.name + t as title

parallel2 转换示例中,fragment 参数首先将集合people 分割成total / partitions 的批次大小,其中分区为100 * JVM 可用的处理器数量(如果total / partitions < 1,则为 1)。然后,它为每个批次创建了一个java.util.concurrent.Future,每个 Future 执行一个类似这样的查询:WITH $t AS t UNWIND $a AS a RETURN a.name + $t as title(其中$apeople 的当前批次)。最后,它计算了这些 Future 的结果。

通常,apoc.cypher.parallel2 过程比apoc.cypher.parallel 更推荐。