使用子查询控制聚合范围
聚合,例如 collect() 和 count(),在查询计划中显示为 EagerAggregation
运算符(带有深蓝色标题)。
它们类似于 Eager 运算符,因为它表示一个屏障,所有行都必须执行并在此停止以进行聚合处理,但在其他方面不会更改查询的流式行为(前提是计划中没有实际的 Eager 运算符)。
这比 Eager 运算符占用更少的内存,但聚合结果(包括用于分组键的变量)必须保留在内存中,直到聚合完成所有输入的处理,然后才能从结果行恢复流式传输。
因此,如果聚合产生的行足够多,则可能会给堆带来压力,这可能导致高 GC 暂停,或者在最坏的情况下耗尽堆内存。
本文介绍了如何使用 Neo4j 4.1 中引入的子查询来提供一种缩小聚合范围的方法。这可能会减少堆压力,从而产生更节省内存的查询。
也就是说,正确性首先很重要;可能需要对所有输入行进行急切聚合才能计算出正确的结果。但是,当您希望急切聚合仅应用于某个扩展(或输入段)时,则可以通过子查询来控制它。
即使在 4.1 之前,您也可以使用模式推导来实现类似的功能(尽管规模较小),这会将 collect() 的范围限制在推导中扩展的模式。
某些 APOC 过程也可以用作子查询的替代。
聚合行为示例
让我们使用一些简单的东西,即 Neo4j 浏览器中 :play movies
指南中的电影图。
从指南中创建此图后,让我们查看一个简单的查询,返回每个电影节点及其演员节点列表
MATCH (movie:Movie)<-[:ACTED_IN]-(p:Person)
RETURN movie, collect(p) as actors
这将如何执行?
将找到第一个路径匹配(从两个节点中的一个进行标签扫描,然后扩展模式,并根据节点标签进行筛选),然后开始聚合处理。
将找到下一个路径匹配,聚合将处理该行,依此类推,聚合将继续接收数据行并相应地构建聚合结果。
最终,所有 172 条路径都已由聚合处理。聚合构建了 38 个结果行(每部电影一个,即分组键),每个结果行包含电影节点和该电影的演员列表。现在所有输入行都已处理完毕,聚合完成,并从 38 个结果行开始流式传输。
堆中的此中间状态不包括节点或关系属性,除非任何已作为变量投影出的属性。 |
稍微大一点的数据集
这是一个小数据集。但是,如果我们有一个具有更大数据集的图会怎么样?据估计,迄今为止已经制作了 500,000 部电影。一次在堆中保存 500,000 行仍然不会有问题,我们必须查看更大的数字才能带来挑战,或者至少在执行期间产生显着的时间差异。
让我们创建一个包含 100 万部电影、100 万名演员以及每部电影 10 名演员的电影图。
我们将使用 APOC 过程中的 apoc.periodic.iterate()
来创建我们的图。
首先,让我们为电影和人员创建 100 万个节点
CALL apoc.periodic.iterate("
UNWIND range(1,1000000) as id
RETURN id
",
"
CREATE (m:Movie {id:id})
CREATE (p:Person {id:id})
", {}) YIELD batches, total, errorMessages
RETURN batches, total, errorMessages
我们现在不需要其他属性。我们正在执行的匹配和聚合不依赖于节点属性,并且除非我们将其投影出来,否则我们不会为使用它们付出代价。
现在让我们确保已到位索引
CREATE INDEX ON :Person(id);
CREATE INDEX ON :Movie(id);
现在我们有了索引,我们可以随机将 10 个人分配给每部电影作为演员
CALL apoc.periodic.iterate("
MATCH (m:Movie)
RETURN m
",
"
UNWIND range(0,10) as i
WITH m, toInteger(rand() * 1000000) as id
MATCH (p:Person {id:id})
CREATE (m)<-[:ACTED_IN]-(p)
", {}) YIELD batches, total, errorMessages
RETURN batches, total, errorMessages
对于我的特定笔记本电脑,我已经为堆配置了 4GB 的内存。实际的服务器部署往往会使用至少两倍于此的内存,最高可达 31GB,作为建议的最大值。
让我们看看我们如何使用原始查询的稍微修改版本。我想排除返回结果(包括所有这些节点的属性访问),因此我们将以 count() 聚合结束,这往往便宜得多(毕竟它只是在每行输入后递增计数,最后)。
MATCH (movie:Movie)<-[:ACTED_IN]-(p:Person)
WITH movie, collect(p) as actors
RETURN count(*)
分组键仍然是电影,因此我们知道必须在聚合构建时最多保存 100 万行,以及每部电影的演员列表。
根据堆内存(以及同时执行的其他查询),这可能会给堆带来压力,导致高 GC 暂停,因为内存被用完并且无法回收。我们可能会完全耗尽所有堆内存。
因此,让我们尝试一下。首先,这是 EXPLAIN 计划
我们可以看到 collect(更昂贵)和 count(便宜)的聚合。让我们尝试运行它
Started streaming 1 records after 1 ms and completed after 14907 ms.
它返回 100 万的计数(省略,因为这没有意义),但更有趣的是执行时间,或者更确切地说,一旦我们有了另一个查询来进行比较,它就会变得有趣。对我来说,这大约花了 15 秒。
此查询最有趣的部分实际上是在调试日志中
2020-10-01 04:06:31.703+0000 WARN [o.n.k.i.c.VmPauseMonitorComponent] Detected VM stop-the-world pause: {pauseTime=178, gcTime=248, gcCount=1}
2020-10-01 04:06:32.893+0000 WARN [o.n.k.i.c.VmPauseMonitorComponent] Detected VM stop-the-world pause: {pauseTime=254, gcTime=269, gcCount=1}
2020-10-01 04:06:34.620+0000 WARN [o.n.k.i.c.VmPauseMonitorComponent] Detected VM stop-the-world pause: {pauseTime=277, gcTime=295, gcCount=1}
2020-10-01 04:06:36.506+0000 WARN [o.n.k.i.c.VmPauseMonitorComponent] Detected VM stop-the-world pause: {pauseTime=328, gcTime=383, gcCount=1}
2020-10-01 04:06:38.847+0000 WARN [o.n.k.i.c.VmPauseMonitorComponent] Detected VM stop-the-world pause: {pauseTime=542, gcTime=628, gcCount=1}
2020-10-01 04:06:40.937+0000 WARN [o.n.k.i.c.VmPauseMonitorComponent] Detected VM stop-the-world pause: {pauseTime=346, gcTime=384, gcCount=1}
2020-10-01 04:06:42.994+0000 WARN [o.n.k.i.c.VmPauseMonitorComponent] Detected VM stop-the-world pause: {pauseTime=314, gcTime=348, gcCount=1}
2020-10-01 04:06:44.965+0000 WARN [o.n.k.i.c.VmPauseMonitorComponent] Detected VM stop-the-world pause: {pauseTime=241, gcTime=271, gcCount=1}
2020-10-01 04:07:04.570+0000 WARN [o.n.k.i.c.VmPauseMonitorComponent] Detected VM stop-the-world pause: {pauseTime=242, gcTime=256, gcCount=1}
2020-10-01 04:08:42.469+0000 WARN [o.n.k.i.c.VmPauseMonitorComponent] Detected VM stop-the-world pause: {pauseTime=169, gcTime=198, gcCount=1}
这些 GC 本身并不是很高,但这表明像这样的聚合确实会导致 GC 暂停。对于更复杂的查询或更复杂的数据集,这些暂停实际上可能会变得非常重要。
子查询缩小聚合范围
如果我们在正确的位置使用子查询并在子查询中进行聚合,我们可以缩小聚合范围,并避免需要同时在内存中体现所有这些行。
MATCH (movie:Movie)
CALL {
WITH movie
MATCH (movie)<-[:ACTED_IN]-(p:Person)
RETURN collect(p) as actors
}
RETURN count(*)
此查询应该更节省内存。
请记住,子查询是按行执行的。由于子查询之前的 MATCH,我们每部电影有一行。
MATCH 和聚合发生在子查询中,因此对于每个 collect(),它一次只考虑单个电影的路径。这意味着每个 collect() 只应用于 10 行输入(因为每部电影有 10 个演员),因此单个行的结果将非常快地可用。
请注意,这是一个权衡:我们不是对 100 万行执行单个 collect() 聚合,而是使用子查询在电影级别细分工作。因为我们有 100 万部电影,所以我们最终会进行 100 万次子查询调用,每次调用都执行自己的扩展和 collect(),因此总共 collect() 被调用了 100 万次,但每次只需要在一小部分数据上运行。
我们可以为每行输入执行子查询,对这个有限的范围执行聚合,输出结果,然后继续下一行。我们在执行该行期间使用的内存都符合垃圾回收的条件,并且在处理后续行时不需要保留在堆中。
首先让我们检查此查询的计划
请注意,我们仍然看到 collect() 的急切聚合,但它正在馈送到 Apply 操作,这表明聚合的范围仅适用于它所应用的项目,这将是每个电影节点。
让我们尝试运行它。我将省略实际的查询结果,因为我们知道它仍然是 100 万,但让我们检查一下时间
Started streaming 1 records after 1 ms and completed after 5542 ms.
重复运行会略有不同,但我们通常会在 4 到 6 秒之间结束。与原始查询的 15 秒相比,这是一个不错的改进。
调试日志中的 GC 暂停怎么样?您的里程可能会有所不同,但即使在多次重复查询执行后,我也没有看到任何记录的 GC。
这表明对大量行的聚合一次可能会占用大量内存,并且您可以通过巧妙地应用子查询来缩小聚合范围(前提是这样做对您的用例是正确的)来避免这种情况和由此产生的 GC 暂停。
模式推导类似于在子查询中调用的 collect()
模式推导可以用于类似的效果,并且从 Neo4j 3.1 开始可用。
MATCH (movie:Movie)
WITH movie, [(movie)<-[:ACTED_IN]-(p:Person) | p] as actors
RETURN count(*)
模式推导最类似于 `OPTIONAL MATCH` 后跟 `collect()`,但类似于子查询,它们在每一行执行。即使 `EXPLAIN` 计划也类似。
请注意,使用 `collect()` 渴望聚合的执行行这次馈送到 `ConditionalApply`,它是 `Apply` 的一个变体,这意味着右侧在嵌套循环中执行,这也是这些操作的作用域。
它的性能如何?
Started streaming 1 records after 1 ms and completed after 4539 ms.
重复运行时间在 4 到 6 秒之间,所以与使用子查询的版本大致相同。同样,我们在调试日志中没有看到 GC。
因此,就效率而言,无论是在时间方面还是内存方面,模式推导都与使用子查询大致相同。
虽然这比使用子查询更简洁,并且通常可以更通用(您可以在单个 `WITH` 子句中使用多个模式推导),但这些仅用于收集结果。虽然您可以获取结果列表的 `size()` 作为 `count()` 的等价物,但您不能将其用于任何其他类型的聚合。
此外,模式推导尚不支持对列表结果进行排序、跳过或限制,而如果使用子查询,则可以自由使用所有这些功能。
APOC 过程可以替代子查询
如果您没有使用 Neo4j 4.1.x 或更高版本,APOC 中有一些过程可以作为子查询来达到相同的效果。
MATCH (movie:Movie)
CALL apoc.cypher.run("
WITH movie
MATCH (movie)<-[:ACTED_IN]-(p:Person)
RETURN collect(p) as actors", {movie:movie}) YIELD value
WITH movie, value.actors as actors
RETURN count(*)
过程,就像子查询一样,在每一行执行,因此 `collect()` 聚合的范围也仅限于该特定调用中匹配的行。
由于有 100 万部电影,因此将总共有 100 万次 `apoc.cypher.run()` 调用,每次调用都执行自己的 `MATCH` 和小型 `collect()`。
我们将省略此计划,因为它不会显示任何有趣的内容。我们会看到一个过程调用操作,但由于查询的核心内容采用查询字符串的形式,因此计划程序无法评估它,因此它不会显示在计划中。
我们可以单独运行复制/粘贴的查询字符串的 `EXPLAIN`,并进行一些小的修改以使其编译,但我们已经看到了类似的计划,其中包含 `collect()` 聚合。唯一的区别是此计划将作为完全独立的事务进行计划和执行,其结果将传递到此查询的事务中。让我们看看它的表现如何。
Started streaming 1 records after 1 ms and completed after 136441 ms.
哇,这里发生了什么?时间激增到大约 2 分钟。为什么会这样?
此 APOC 过程创建并执行查询作为一个新的事务,而不是原生子查询,后者仍然在同一个事务中执行。这意味着我们实际上是通过 APOC 使用这种方法执行了 100 万个独立的事务,这在设置和执行方面会产生成本。
如果在如此多的行上运行时,时间成本如此之高,我们为什么要考虑这种方法?因为我们在调试日志中仍然没有看到 GC 暂停。
如果由于像这样的聚合导致您的查询出现 GC 和堆外问题,并且如果您没有运行足够高的版本来使用原生子查询,并且如果用例不允许您使用模式推导,那么使用某些 APOC 过程的这种方法可能会让您避免这些 GC 和堆压力,但可能会以时间为代价。
与往常一样,请对您的数据执行自己的计时测试。
此页面是否有帮助?