Spark 优化
过滤器
Neo4j Connector for Apache Spark 实现 SupportPushdownFilters
接口,允许您将 Spark 过滤器下推到 Neo4j 层。这样,Spark 收到的数据将已经由 Neo4j 过滤,减少了从 Neo4j 传输到 Spark 的数据量。
您可以使用 pushdown.filters.enabled
选项手动禁用 PushdownFilters
支持,并将其设置为 false
(默认值为 true
)。
如果您多次使用 filter 函数,例如在本例中
import org.apache.spark.sql.{SaveMode, SparkSession}
val spark = SparkSession.builder().getOrCreate()
val df = (spark.read.format("org.neo4j.spark.DataSource")
.option("url", "neo4j://127.0.0.1:7687")
.option("authentication.basic.username", "neo4j")
.option("authentication.basic.password", "letmein!")
.option("labels", ":Person")
.load())
df.where("name = 'John Doe'").where("age = 32").show()
这些条件会自动使用 AND
运算符连接起来。
当使用 relationship.node.map = true 或 query 时,PushdownFilters 支持会自动禁用。在这种情况下,过滤器由 Spark 应用,而不是由 Neo4j 应用。 |
聚合
Neo4j Connector for Apache Spark 实现 SupportsPushDownAggregates
接口,允许您将 Spark 聚合下推到 Neo4j 层。这样,Spark 收到的数据将已经由 Neo4j 聚合,减少了从 Neo4j 传输到 Spark 的数据量。
您可以使用 pushdown.aggregate.enabled
选项手动禁用 PushdownAggregate 支持,并将其设置为 false
(默认值为 true
)。
// Given a DB populated with the following query
"""
CREATE (pe:Person {id: 1, fullName: 'Jane Doe'})
WITH pe
UNWIND range(1, 10) as id
CREATE (pr:Product {id: id * rand(), name: 'Product ' + id, price: id})
CREATE (pe)-[:BOUGHT{when: rand(), quantity: rand() * 1000}]->(pr)
RETURN *
"""
import org.apache.spark.sql.{SaveMode, SparkSession}
val spark = SparkSession.builder().getOrCreate()
(spark.read.format("org.neo4j.spark.DataSource")
.option("url", "neo4j://127.0.0.1:7687")
.option("authentication.basic.username", "neo4j")
.option("authentication.basic.password", "letmein!")
.option("relationship", "BOUGHT")
.option("relationship.source.labels", "Person")
.option("relationship.target.labels", "Product")
.load
.createTempView("BOUGHT"))
val df = spark.sql(
"""SELECT `source.fullName`, MAX(`target.price`) AS max, MIN(`target.price`) AS min
|FROM BOUGHT
|GROUP BY `source.fullName`""".stripMargin)
df.show()
MAX
和 MIN
运算符直接在 Neo4j 上应用。
下推限制
Neo4j Connector for Apache Spark 实现 SupportsPushDownLimit
接口。这允许您将 Spark 限制下推到 Neo4j 层。这样,Spark 收到的数据将已经由 Neo4j 限制。这减少了从 Neo4j 传输到 Spark 的数据量。
您可以使用 pushdown.limit.enabled
选项手动禁用 PushdownLimit
支持,并将其设置为 false
(默认值为 true
)。
// Given a DB populated with the following query
"""
CREATE (pe:Person {id: 1, fullName: 'Jane Doe'})
WITH pe
UNWIND range(1, 10) as id
CREATE (pr:Product {id: id * rand(), name: 'Product ' + id, price: id})
CREATE (pe)-[:BOUGHT{when: rand(), quantity: rand() * 1000}]->(pr)
RETURN *
"""
import org.apache.spark.sql.{SaveMode, SparkSession}
val spark = SparkSession.builder().getOrCreate()
val df = (spark.read
.format("org.neo4j.spark.DataSource")
.option("url", "neo4j://127.0.0.1:7687")
.option("authentication.basic.username", "neo4j")
.option("authentication.basic.password", "letmein!")
.option("relationship", "BOUGHT")
.option("relationship.source.labels", "Person")
.option("relationship.target.labels", "Product")
.load
.select("`target.name`", "`target.id`")
.limit(10))
df.show()
limit
值将下推到 Neo4j。
下推前 N 个
Neo4j Connector for Apache Spark 实现 SupportsPushDownTopN
接口。这允许您将前 N 个聚合下推到 Neo4j 层。这样,Spark 收到的数据将已经由 Neo4j 聚合和限制。这减少了从 Neo4j 传输到 Spark 的数据量。
您可以使用 pushdown.topN.enabled
选项手动禁用 PushDownTopN
支持,并将其设置为 false
(默认值为 true
)。
// Given a DB populated with the following query
"""
CREATE (pe:Person {id: 1, fullName: 'Jane Doe'})
WITH pe
UNWIND range(1, 10) as id
CREATE (pr:Product {id: id * rand(), name: 'Product ' + id, price: id})
CREATE (pe)-[:BOUGHT{when: rand(), quantity: rand() * 1000}]->(pr)
RETURN *
"""
import org.apache.spark.sql.{SaveMode, SparkSession}
val spark = SparkSession.builder().getOrCreate()
val df = (spark.read
.format("org.neo4j.spark.DataSource")
.option("url", "neo4j://127.0.0.1:7687")
.option("authentication.basic.username", "neo4j")
.option("authentication.basic.password", "letmein!")
.option("relationship", "BOUGHT")
.option("relationship.source.labels", "Person")
.option("relationship.target.labels", "Product")
.load
.select("`target.name`", "`target.id`")
.sort(col("`target.name`").desc)
.limit(10))
df.show()
limit
值将下推到 Neo4j。
分区
当我们试图提取数据时,我们提供了对提取进行分区的可能性,以便对其进行并行化。
请考虑以下作业
import org.apache.spark.sql.{SaveMode, SparkSession}
val spark = SparkSession.builder().getOrCreate()
val df = (spark.read.format("org.neo4j.spark.DataSource")
.option("url", "neo4j://127.0.0.1:7687")
.option("authentication.basic.username", "neo4j")
.option("authentication.basic.password", "letmein!")
.option("labels", "Person")
.option("partitions", "5")
.load())
这意味着,如果 Neo4j 中具有标签 Person
的节点的总数为 100,我们将创建 5 个分区,每个分区管理 20 条记录(我们使用 SKIP / LIMIT
查询)。
仅当您处理大型数据集(>= 10M 条记录)时,对数据集进行分区才有意义。
如何并行化查询执行
有三个选项可用
-
节点提取。
-
关系提取。
-
查询提取。
将提供有关您试图提取的内容的一般计数,并将针对每个分区构建具有 SKIP / LIMIT
方法的查询。
因此,对于包含 100 个节点(Person
)且分区大小为 5 的数据集,将生成以下查询(每个分区一个)
MATCH (p:Person) RETURN p SKIP 0 LIMIT 20
MATCH (p:Person) RETURN p SKIP 20 LIMIT 20
MATCH (p:Person) RETURN p SKIP 40 LIMIT 20
MATCH (p:Person) RETURN p SKIP 60 LIMIT 20
MATCH (p:Person) RETURN p SKIP 80 LIMIT 20
对于节点和关系提取,您利用 Neo4j 计数存储 来检索有关您试图提取的节点/关系的总数,对于 (3),您有两种可能的方法
-
计算您正在使用的查询的计数。
-
计算利用索引的第二个**优化**查询的全局计数。在这种情况下,您可以通过
.option("query.count", "<您的 Cypher 查询>")
传递它。
该选项可以通过两种方式设置
-
作为整数:
.option("query.count", 100)
-
作为查询,它必须始终仅返回一个名为
count
的字段,该字段是计数结果
MATCH (p:Person)-[r:BOUGHT]->(pr:Product)
WHERE pr.name = 'An Awesome Product'
RETURN count(p) AS count