Spark 优化
过滤器
用于 Apache Spark 的 Neo4j 连接器实现了 SupportPushdownFilters
接口,该接口允许您将 Spark 过滤器下推到 Neo4j 层。通过这种方式,Spark 接收到的数据已经由 Neo4j 过滤,从而减少了从 Neo4j 到 Spark 的数据传输量。
您可以通过将 pushdown.filters.enabled
选项设置为 false
(默认为 true
)来手动禁用 PushdownFilters
支持。
如果您多次使用过滤函数,如本例所示
import org.apache.spark.sql.{SaveMode, SparkSession}
val spark = SparkSession.builder().getOrCreate()
val df = (spark.read.format("org.neo4j.spark.DataSource")
.option("url", "neo4j://localhost:7687")
.option("authentication.basic.username", "neo4j")
.option("authentication.basic.password", "letmein!")
.option("labels", ":Person")
.load())
df.where("name = 'John Doe'").where("age = 32").show()
条件会自动通过 AND
运算符连接。
当使用 relationship.node.map = true 或 query 时,PushdownFilters 支持会自动禁用。在这种情况下,过滤器由 Spark 而非 Neo4j 应用。 |
聚合
用于 Apache Spark 的 Neo4j 连接器实现了 SupportsPushDownAggregates
接口,该接口允许您将 Spark 聚合下推到 Neo4j 层。通过这种方式,Spark 接收到的数据已经由 Neo4j 聚合,从而减少了从 Neo4j 到 Spark 的数据传输量。
您可以通过将 pushdown.aggregate.enabled
选项设置为 false
(默认为 true
)来手动禁用 PushdownAggregate 支持。
// Given a DB populated with the following query
"""
CREATE (pe:Person {id: 1, fullName: 'Jane Doe'})
WITH pe
UNWIND range(1, 10) as id
CREATE (pr:Product {id: id * rand(), name: 'Product ' + id, price: id})
CREATE (pe)-[:BOUGHT{when: rand(), quantity: rand() * 1000}]->(pr)
RETURN *
"""
import org.apache.spark.sql.{SaveMode, SparkSession}
val spark = SparkSession.builder().getOrCreate()
(spark.read.format("org.neo4j.spark.DataSource")
.option("url", "neo4j://localhost:7687")
.option("authentication.basic.username", "neo4j")
.option("authentication.basic.password", "letmein!")
.option("relationship", "BOUGHT")
.option("relationship.source.labels", "Person")
.option("relationship.target.labels", "Product")
.load
.createTempView("BOUGHT"))
val df = spark.sql(
"""SELECT `source.fullName`, MAX(`target.price`) AS max, MIN(`target.price`) AS min
|FROM BOUGHT
|GROUP BY `source.fullName`""".stripMargin)
df.show()
MAX
和 MIN
运算符直接在 Neo4j 上应用。
下推限制
用于 Apache Spark 的 Neo4j 连接器实现了 SupportsPushDownLimit
接口。这允许您将 Spark 限制下推到 Neo4j 层。通过这种方式,Spark 接收到的数据已经由 Neo4j 限制。这减少了从 Neo4j 到 Spark 的数据传输量。
您可以通过将 pushdown.limit.enabled
选项设置为 false
(默认为 true
)来手动禁用 PushdownLimit
支持。
// Given a DB populated with the following query
"""
CREATE (pe:Person {id: 1, fullName: 'Jane Doe'})
WITH pe
UNWIND range(1, 10) as id
CREATE (pr:Product {id: id * rand(), name: 'Product ' + id, price: id})
CREATE (pe)-[:BOUGHT{when: rand(), quantity: rand() * 1000}]->(pr)
RETURN *
"""
import org.apache.spark.sql.{SaveMode, SparkSession}
val spark = SparkSession.builder().getOrCreate()
val df = (spark.read
.format("org.neo4j.spark.DataSource")
.option("url", "neo4j://localhost:7687")
.option("authentication.basic.username", "neo4j")
.option("authentication.basic.password", "letmein!")
.option("relationship", "BOUGHT")
.option("relationship.source.labels", "Person")
.option("relationship.target.labels", "Product")
.load
.select("`target.name`", "`target.id`")
.limit(10))
df.show()
limit
值将被下推到 Neo4j。
下推 Top N
用于 Apache Spark 的 Neo4j 连接器实现了 SupportsPushDownTopN
接口。这允许您将 Top N 聚合下推到 Neo4j 层。通过这种方式,Spark 接收到的数据已经由 Neo4j 聚合并限制。这减少了从 Neo4j 到 Spark 的数据传输量。
您可以通过将 pushdown.topN.enabled
选项设置为 false
(默认为 true
)来手动禁用 PushDownTopN
支持。
// Given a DB populated with the following query
"""
CREATE (pe:Person {id: 1, fullName: 'Jane Doe'})
WITH pe
UNWIND range(1, 10) as id
CREATE (pr:Product {id: id * rand(), name: 'Product ' + id, price: id})
CREATE (pe)-[:BOUGHT{when: rand(), quantity: rand() * 1000}]->(pr)
RETURN *
"""
import org.apache.spark.sql.{SaveMode, SparkSession}
val spark = SparkSession.builder().getOrCreate()
val df = (spark.read
.format("org.neo4j.spark.DataSource")
.option("url", "neo4j://localhost:7687")
.option("authentication.basic.username", "neo4j")
.option("authentication.basic.password", "letmein!")
.option("relationship", "BOUGHT")
.option("relationship.source.labels", "Person")
.option("relationship.target.labels", "Product")
.load
.select("`target.name`", "`target.id`")
.sort(col("`target.name`").desc)
.limit(10))
df.show()
limit
值将被下推到 Neo4j。
分区
在尝试提取数据时,我们提供了对提取进行分区以实现并行化的可能性。
请考虑以下任务
import org.apache.spark.sql.{SaveMode, SparkSession}
val spark = SparkSession.builder().getOrCreate()
val df = (spark.read.format("org.neo4j.spark.DataSource")
.option("url", "neo4j://localhost:7687")
.option("authentication.basic.username", "neo4j")
.option("authentication.basic.password", "letmein!")
.option("labels", "Person")
.option("partitions", "5")
.load())
这意味着,如果 Neo4j 中带有 Person
标签的节点总数为 100,我们将创建 5 个分区,每个分区管理 20 条记录(我们使用 SKIP / LIMIT
查询)。
仅当您处理大型数据集(>=
1000 万条记录)时,对数据集进行分区才有意义。
如何并行化查询执行
有三种选项可用
-
节点提取。
-
关系提取。
-
查询提取。
系统会提供您尝试提取的总计数,并为每个分区构建一个采用 SKIP / LIMIT
方法的查询。
因此,对于包含 100 个节点(Person
)且分区大小为 5 的数据集,将生成以下查询(每个分区一个)
MATCH (p:Person) RETURN p SKIP 0 LIMIT 20
MATCH (p:Person) RETURN p SKIP 20 LIMIT 20
MATCH (p:Person) RETURN p SKIP 40 LIMIT 20
MATCH (p:Person) RETURN p SKIP 60 LIMIT 20
MATCH (p:Person) RETURN p SKIP 80 LIMIT 20
对于节点和关系提取,您可以利用 Neo4j 计数存储 来检索您尝试提取的节点/关系的总体计数,而对于 (3) 您有两种可能的方法
-
计算您正在使用的查询的总计数。
-
通过第二个利用索引的优化查询来计算全局计数。在这种情况下,您可以通过
.option("query.count", "<your cypher query>")
传递它。
该选项可以通过两种方式设置
-
作为整数:
.option("query.count", 100)
-
作为查询,该查询必须始终只返回一个名为
count
的字段,该字段是计数的结果
MATCH (p:Person)-[r:BOUGHT]->(pr:Product)
WHERE pr.name = 'An Awesome Product'
RETURN count(p) AS count