Spark 优化

过滤器

用于 Apache Spark 的 Neo4j 连接器实现了 SupportPushdownFilters 接口,该接口允许您将 Spark 过滤器下推到 Neo4j 层。通过这种方式,Spark 接收到的数据已经由 Neo4j 过滤,从而减少了从 Neo4j 到 Spark 的数据传输量。

您可以通过将 pushdown.filters.enabled 选项设置为 false(默认为 true)来手动禁用 PushdownFilters 支持。

如果您多次使用过滤函数,如本例所示

import org.apache.spark.sql.{SaveMode, SparkSession}

val spark = SparkSession.builder().getOrCreate()

val df = (spark.read.format("org.neo4j.spark.DataSource")
  .option("url", "neo4j://localhost:7687")
  .option("authentication.basic.username", "neo4j")
  .option("authentication.basic.password", "letmein!")
  .option("labels", ":Person")
  .load())

df.where("name = 'John Doe'").where("age = 32").show()

条件会自动通过 AND 运算符连接。

当使用 relationship.node.map = truequery 时,PushdownFilters 支持会自动禁用。在这种情况下,过滤器由 Spark 而非 Neo4j 应用。

聚合

用于 Apache Spark 的 Neo4j 连接器实现了 SupportsPushDownAggregates 接口,该接口允许您将 Spark 聚合下推到 Neo4j 层。通过这种方式,Spark 接收到的数据已经由 Neo4j 聚合,从而减少了从 Neo4j 到 Spark 的数据传输量。

您可以通过将 pushdown.aggregate.enabled 选项设置为 false(默认为 true)来手动禁用 PushdownAggregate 支持。

// Given a DB populated with the following query
"""
  CREATE (pe:Person {id: 1, fullName: 'Jane Doe'})
  WITH pe
  UNWIND range(1, 10) as id
  CREATE (pr:Product {id: id * rand(), name: 'Product ' + id, price: id})
  CREATE (pe)-[:BOUGHT{when: rand(), quantity: rand() * 1000}]->(pr)
  RETURN *
"""
import org.apache.spark.sql.{SaveMode, SparkSession}
val spark = SparkSession.builder().getOrCreate()

(spark.read.format("org.neo4j.spark.DataSource")
  .option("url", "neo4j://localhost:7687")
  .option("authentication.basic.username", "neo4j")
  .option("authentication.basic.password", "letmein!")
  .option("relationship", "BOUGHT")
  .option("relationship.source.labels", "Person")
  .option("relationship.target.labels", "Product")
  .load
  .createTempView("BOUGHT"))


val df = spark.sql(
  """SELECT `source.fullName`, MAX(`target.price`) AS max, MIN(`target.price`) AS min
    |FROM BOUGHT
    |GROUP BY `source.fullName`""".stripMargin)

df.show()

MAXMIN 运算符直接在 Neo4j 上应用。

下推限制

用于 Apache Spark 的 Neo4j 连接器实现了 SupportsPushDownLimit 接口。这允许您将 Spark 限制下推到 Neo4j 层。通过这种方式,Spark 接收到的数据已经由 Neo4j 限制。这减少了从 Neo4j 到 Spark 的数据传输量。

您可以通过将 pushdown.limit.enabled 选项设置为 false(默认为 true)来手动禁用 PushdownLimit 支持。

// Given a DB populated with the following query
"""
  CREATE (pe:Person {id: 1, fullName: 'Jane Doe'})
  WITH pe
  UNWIND range(1, 10) as id
  CREATE (pr:Product {id: id * rand(), name: 'Product ' + id, price: id})
  CREATE (pe)-[:BOUGHT{when: rand(), quantity: rand() * 1000}]->(pr)
  RETURN *
"""
import org.apache.spark.sql.{SaveMode, SparkSession}
val spark = SparkSession.builder().getOrCreate()

val df = (spark.read
      .format("org.neo4j.spark.DataSource")
      .option("url", "neo4j://localhost:7687")
      .option("authentication.basic.username", "neo4j")
      .option("authentication.basic.password", "letmein!")
      .option("relationship", "BOUGHT")
      .option("relationship.source.labels", "Person")
      .option("relationship.target.labels", "Product")
      .load
      .select("`target.name`", "`target.id`")
      .limit(10))


df.show()

limit 值将被下推到 Neo4j。

下推 Top N

用于 Apache Spark 的 Neo4j 连接器实现了 SupportsPushDownTopN 接口。这允许您将 Top N 聚合下推到 Neo4j 层。通过这种方式,Spark 接收到的数据已经由 Neo4j 聚合并限制。这减少了从 Neo4j 到 Spark 的数据传输量。

您可以通过将 pushdown.topN.enabled 选项设置为 false(默认为 true)来手动禁用 PushDownTopN 支持。

// Given a DB populated with the following query
"""
  CREATE (pe:Person {id: 1, fullName: 'Jane Doe'})
  WITH pe
  UNWIND range(1, 10) as id
  CREATE (pr:Product {id: id * rand(), name: 'Product ' + id, price: id})
  CREATE (pe)-[:BOUGHT{when: rand(), quantity: rand() * 1000}]->(pr)
  RETURN *
"""
import org.apache.spark.sql.{SaveMode, SparkSession}
val spark = SparkSession.builder().getOrCreate()

val df = (spark.read
      .format("org.neo4j.spark.DataSource")
      .option("url", "neo4j://localhost:7687")
      .option("authentication.basic.username", "neo4j")
      .option("authentication.basic.password", "letmein!")
      .option("relationship", "BOUGHT")
      .option("relationship.source.labels", "Person")
      .option("relationship.target.labels", "Product")
      .load
      .select("`target.name`", "`target.id`")
      .sort(col("`target.name`").desc)
      .limit(10))


df.show()

limit 值将被下推到 Neo4j。

分区

在尝试提取数据时,我们提供了对提取进行分区以实现并行化的可能性。

请考虑以下任务

import org.apache.spark.sql.{SaveMode, SparkSession}

val spark = SparkSession.builder().getOrCreate()

val df = (spark.read.format("org.neo4j.spark.DataSource")
        .option("url", "neo4j://localhost:7687")
        .option("authentication.basic.username", "neo4j")
        .option("authentication.basic.password", "letmein!")
        .option("labels", "Person")
        .option("partitions", "5")
        .load())

这意味着,如果 Neo4j 中带有 Person 标签的节点总数为 100,我们将创建 5 个分区,每个分区管理 20 条记录(我们使用 SKIP / LIMIT 查询)。

仅当您处理大型数据集(>= 1000 万条记录)时,对数据集进行分区才有意义。

如何并行化查询执行

有三种选项可用

  1. 节点提取。

  2. 关系提取。

  3. 查询提取。

系统会提供您尝试提取的总计数,并为每个分区构建一个采用 SKIP / LIMIT 方法的查询。

因此,对于包含 100 个节点(Person)且分区大小为 5 的数据集,将生成以下查询(每个分区一个)

MATCH (p:Person) RETURN p SKIP 0 LIMIT 20
MATCH (p:Person) RETURN p SKIP 20 LIMIT 20
MATCH (p:Person) RETURN p SKIP 40 LIMIT 20
MATCH (p:Person) RETURN p SKIP 60 LIMIT 20
MATCH (p:Person) RETURN p SKIP 80 LIMIT 20

对于节点和关系提取,您可以利用 Neo4j 计数存储 来检索您尝试提取的节点/关系的总体计数,而对于 (3) 您有两种可能的方法

  • 计算您正在使用的查询的总计数。

  • 通过第二个利用索引的优化查询来计算全局计数。在这种情况下,您可以通过 .option("query.count", "<your cypher query>") 传递它。

该选项可以通过两种方式设置

  • 作为整数:.option("query.count", 100)

  • 作为查询,该查询必须始终只返回一个名为 count 的字段,该字段是计数的结果

MATCH (p:Person)-[r:BOUGHT]->(pr:Product)
WHERE pr.name = 'An Awesome Product'
RETURN count(p) AS count
© . All rights reserved.