知识库

使用子查询控制聚合范围

聚合操作,例如 collect() 和 count(),在查询计划中显示为 EagerAggregation 运算符(带有深蓝色标题)。

这与 Eager 运算符类似,因为它提供了一个屏障,所有行都必须执行到并在此处停止才能进行聚合处理,但它不会改变查询的流式行为(前提是计划中没有实际的 Eager 运算符)。

这比 Eager 运算符对内存的占用更少,但聚合结果(包括用于分组键的变量)必须保留在内存中,直到聚合完成处理所有输入,然后才能从结果行恢复流式传输。

因此,如果聚合产生的行足够多,可能会给堆内存带来压力,这可能导致高 GC 暂停,最坏情况是堆内存耗尽。

本文展示了如何使用 Neo4j 4.1 中引入的子查询来缩小聚合的范围。这可能会减轻堆内存压力,从而实现更高效的内存查询。

话虽如此,正确性是首要考虑的;对所有输入行进行 Eager 聚合可能是计算正确结果所必需的。但是,当您希望 Eager 聚合仅应用于特定的扩展(或输入段)时,您可以通过子查询来控制。

甚至在 4.1 之前,您也可以通过模式推导式(Pattern Comprehensions)实现类似的效果(尽管规模较小),它能够将 collect() 范围限定在推导式中扩展的模式。

一些 APOC 过程也可以用作子查询的替代品。

聚合行为示例

让我们使用一个简单的例子:Neo4j Browser 中 :play movies 指南里的电影图谱。

从该指南创建图谱后,我们来看一个简单的查询,它返回每个电影节点及其演员节点列表。

MATCH (movie:Movie)<-[:ACTED_IN]-(p:Person)
RETURN movie, collect(p) as actors

这会如何执行?

将找到第一个路径匹配(从一个节点进行标签扫描,然后展开模式,并根据节点标签进行过滤),然后开始聚合处理。

将找到下一个路径匹配,聚合将处理该行,依此类推,聚合会持续接收数据行以相应地构建聚合结果。

最终,所有 172 条路径都已由聚合处理。聚合已构建了 38 个结果行(每部电影一行,作为分组键),每行包含电影节点和该电影的演员列表。现在所有输入行都已处理完毕,聚合完成,并开始从这 38 个结果行进行流式传输。

堆中的这种中间状态不包括节点或关系属性,除非有任何已作为变量投影出来的属性。

稍微大一点的数据集

这是一个小数据集。但是如果我们有一个大得多的数据集会怎样?据估计,历史上已经制作了 50 万部电影。50 万行同时保存在堆中仍然不是问题,我们必须看更大的数字才能构成挑战,或者至少在执行过程中产生显著的时间差异。

让我们创建一个包含 100 万部电影、100 万名演员,并且每部电影有 10 名演员的电影图谱。

我们将使用 APOC 过程中的 apoc.periodic.iterate() 来创建图谱。

首先,让我们为电影和人物分别创建 100 万个节点。

CALL apoc.periodic.iterate("
UNWIND range(1,1000000) as id
RETURN id
",
"
CREATE (m:Movie {id:id})
CREATE (p:Person {id:id})
", {}) YIELD batches, total, errorMessages
RETURN batches, total, errorMessages

我们目前不需要额外的属性。我们正在进行的匹配和聚合不依赖于节点属性,而且在我们将它们投影出来之前,我们也不需要为使用它们支付成本。

现在让我们确保索引已就位。

CREATE INDEX ON :Person(id);
CREATE INDEX ON :Movie(id);

现在我们有了索引,我们可以随机为每部电影分配 10 名人物作为演员。

CALL apoc.periodic.iterate("
MATCH (m:Movie)
RETURN m
",
"
UNWIND range(0,10) as i
WITH m, toInteger(rand() * 1000000) as id
MATCH (p:Person {id:id})
CREATE (m)<-[:ACTED_IN]-(p)
", {}) YIELD batches, total, errorMessages
RETURN batches, total, errorMessages

对于我的笔记本电脑,我为堆内存配置了 4GB。实际的服务器部署通常会使用至少两倍的内存,建议最大可达 31GB。

让我们看看我们如何操作,使用原始查询的一个稍微修改过的版本。我希望排除结果的返回(其中包括所有这些节点的属性访问),因此我们只以 count() 聚合结束,这通常便宜得多(毕竟它只是在最后为每个输入行增加计数)。

MATCH (movie:Movie)<-[:ACTED_IN]-(p:Person)
WITH movie, collect(p) as actors
RETURN count(*)

分组键仍然是电影,所以我们知道在聚合构建过程中,我们必须保留多达 100 万行,以及每部电影的演员列表。

根据堆内存(以及同时执行的其他查询),这可能会给堆内存带来压力,导致内存耗尽且无法回收时出现高 GC 暂停。我们可能会完全耗尽所有堆内存。

那么让我们试一试。首先,这是 EXPLAIN 计划。

subqueries for aggregations plan1

我们可以看到 collect(更昂贵)和 count(便宜)的聚合。让我们尝试运行它。

Started streaming 1 records after 1 ms and completed after 14907 ms.

它返回了 100 万的计数(已省略,因为这不重要),但更有趣的是执行时间,或者更确切地说,当我们有一个可供比较的替代查询时,它会变得有趣。对我来说,这大约用了 15 秒。

这个查询最有趣的部分实际上在调试日志中。

2020-10-01 04:06:31.703+0000 WARN [o.n.k.i.c.VmPauseMonitorComponent] Detected VM stop-the-world pause: {pauseTime=178, gcTime=248, gcCount=1}
2020-10-01 04:06:32.893+0000 WARN [o.n.k.i.c.VmPauseMonitorComponent] Detected VM stop-the-world pause: {pauseTime=254, gcTime=269, gcCount=1}
2020-10-01 04:06:34.620+0000 WARN [o.n.k.i.c.VmPauseMonitorComponent] Detected VM stop-the-world pause: {pauseTime=277, gcTime=295, gcCount=1}
2020-10-01 04:06:36.506+0000 WARN [o.n.k.i.c.VmPauseMonitorComponent] Detected VM stop-the-world pause: {pauseTime=328, gcTime=383, gcCount=1}
2020-10-01 04:06:38.847+0000 WARN [o.n.k.i.c.VmPauseMonitorComponent] Detected VM stop-the-world pause: {pauseTime=542, gcTime=628, gcCount=1}
2020-10-01 04:06:40.937+0000 WARN [o.n.k.i.c.VmPauseMonitorComponent] Detected VM stop-the-world pause: {pauseTime=346, gcTime=384, gcCount=1}
2020-10-01 04:06:42.994+0000 WARN [o.n.k.i.c.VmPauseMonitorComponent] Detected VM stop-the-world pause: {pauseTime=314, gcTime=348, gcCount=1}
2020-10-01 04:06:44.965+0000 WARN [o.n.k.i.c.VmPauseMonitorComponent] Detected VM stop-the-world pause: {pauseTime=241, gcTime=271, gcCount=1}
2020-10-01 04:07:04.570+0000 WARN [o.n.k.i.c.VmPauseMonitorComponent] Detected VM stop-the-world pause: {pauseTime=242, gcTime=256, gcCount=1}
2020-10-01 04:08:42.469+0000 WARN [o.n.k.i.c.VmPauseMonitorComponent] Detected VM stop-the-world pause: {pauseTime=169, gcTime=198, gcCount=1}

这些 GC 单独来看并不高,但这表明这样的聚合确实会导致 GC 暂停。对于更复杂的查询或更复杂的数据集,这些暂停实际上可能会变得相当显著。

子查询缩小聚合范围

如果我们在正确的位置使用子查询并在子查询中进行聚合,我们可以缩小聚合的范围,并避免同时在内存中呈现所有这些行的需要。

MATCH (movie:Movie)
CALL {
    WITH movie
    MATCH (movie)<-[:ACTED_IN]-(p:Person)
    RETURN collect(p) as actors
}
RETURN count(*)

这种方式应该更节省内存。

请记住,子查询是按行执行的。由于子查询之前的 MATCH,我们为每部电影都有一行。

MATCH 和聚合发生在子查询内部,因此对于每个 collect(),它一次只考虑一部电影的路径。这意味着每个 collect() 只应用于 10 个输入行(因为每部电影有 10 位演员),所以单行的结果会非常快地得到。

请注意,这是一个权衡:我们不是对 100 万行数据执行单个 collect() 聚合,而是使用子查询在电影级别分解工作。因为我们有 100 万部电影,所以我们最终会进行 100 万次子查询调用,每个调用都执行自己的扩展和 collect(),因此 collect() 总共被调用 100 万次,但每次只需在极小的数据集上运行。

我们可以为每个输入行执行子查询,在这个有限的范围内执行聚合,输出结果,然后继续处理下一行。在执行该行期间使用的内存都符合垃圾回收条件,并且在处理后续行时无需保留在堆中。

首先,让我们检查这个查询的计划。

subqueries for aggregations plan2

请注意,我们仍然看到 collect() 的 Eager 聚合,但它正输入到一个 Apply 操作中,这表明聚合的范围仅限于它所应用的项,即每个电影节点。

让我们尝试运行它。我将省略实际的查询结果,因为我们知道它仍然是 100 万,但让我们检查一下时间。

Started streaming 1 records after 1 ms and completed after 5542 ms.

重复运行的结果略有不同,但通常在 4 到 6 秒之间。这比原始查询的 15 秒有了很大的改进。

调试日志中的 GC 暂停情况如何?结果可能因人而异,但即使重复执行查询几次,我也没看到任何 GC 被记录下来。

这表明对大量行同时进行的聚合可能会消耗大量内存,而您通常可以通过巧妙地应用子查询来缩小聚合范围,从而避免这种情况以及由此导致的 GC 暂停(前提是这种做法适合您的用例)。

模式推导式类似于在子查询中调用的 collect()

模式推导式可以实现类似的效果,并且自 Neo4j 3.1 版本起即可使用。

MATCH (movie:Movie)
WITH movie, [(movie)<-[:ACTED_IN]-(p:Person) | p] as actors
RETURN count(*)

模式推导式最像 OPTIONAL MATCH 之后跟着 collect(),但与子查询类似,它们是按行执行的。甚至 EXPLAIN 计划也相似。

subqueries for aggregations plan3

请注意,带有 collect() eager 聚合的执行行这次会输入到 ConditionalApply 中,它是 Apply 的一个变体,这意味着右侧在一个嵌套循环中执行,这也是这些操作的作用域。

它的表现如何?

Started streaming 1 records after 1 ms and completed after 4539 ms.

重复运行的结果在 4 到 6 秒之间,与使用子查询的版本大致相同。同样,我们在调试日志中也没有看到 GC。

因此,就效率而言,无论是时间还是内存,模式推导式都与使用子查询大致相同。

虽然这比使用子查询更简洁,并且通常更通用(您可以在单个 WITH 子句中使用多个模式推导式),但它们仅用于收集结果。尽管您可以获取结果列表的 size() 作为 count() 的等价物,但不能将其用于任何其他类型的聚合。

此外,模式推导式尚不允许对列表结果进行排序、跳过或限制,而如果使用子查询,所有这些都可以自由使用。

APOC 过程可以替代子查询

如果您使用的不是 Neo4j 4.1.x 或更高版本,APOC 中有一些过程可以充当子查询,实现相同的效果。

MATCH (movie:Movie)
CALL apoc.cypher.run("
    WITH movie
    MATCH (movie)<-[:ACTED_IN]-(p:Person)
    RETURN collect(p) as actors", {movie:movie}) YIELD value
WITH movie, value.actors as actors
RETURN count(*)

过程与子查询类似,都是按行执行的,因此 collect() 聚合的范围也仅限于该特定调用中匹配的行。

由于有 100 万部电影,总共会有 100 万次 apoc.cypher.run() 调用,每个调用都执行自己的 MATCH 和小范围的 collect()。

我们将省略此查询的计划,因为它不会显示任何有趣的内容。我们会看到一个过程调用操作,但由于查询的核心部分是查询字符串的形式,规划器无法评估它,因此它不会显示在计划中。

我们可以单独运行复制/粘贴的查询字符串的 EXPLAIN,只需稍作修改使其能够编译,但我们已经见过像这样的计划,带有 collect() 聚合。唯一的区别是,此计划将作为完全独立的事务进行规划和执行,其结果将提供给此查询的事务。让我们看看它的表现如何。

Started streaming 1 records after 1 ms and completed after 136441 ms.

哇,这里发生了什么?时间飙升得非常高,大约 2 分钟。为什么会这样?

这个 APOC 过程将查询作为新事务创建并执行,而原生子查询仍然在同一个单一事务中执行。这意味着我们实际上通过这种方法执行了 100 万个独立的 APOC 事务,这在设置和执行方面会产生开销。

如果这种方法在处理大量行时如此耗时,我们为什么还要考虑它呢?因为我们仍然在调试日志中没有看到 GC 暂停。

如果 GC 和堆内存不足是由于此类聚合导致您的查询出现的问题,并且如果您运行的版本不足以使用原生子查询,而且用例不允许您使用模式推导式,那么使用某些 APOC 过程的这种方法可能会让您避免这些 GC 和堆内存压力,但可能以时间为代价。

一如既往,请在您自己的数据上进行计时测试。

© . All rights reserved.