使用 Neo4j GDS 的数据科学
Neo4j 图数据科学 (GDS) 使数据科学家能够利用强大的图算法。它提供无监督机器学习 (ML) 方法和启发式方法,用于学习和描述图的拓扑结构。GDS 包括具有企业功能的强化图算法,例如确定性种子以获得一致的结果和可重现的 ML 工作流。
GDS 算法分为五组
-
社区检测,检测组集群和分区选项。
-
中心性,帮助计算图中节点的重要性。
-
拓扑链接预测,估计节点形成关系的可能性。
-
相似性,评估节点对的相似性。
-
路径查找和搜索,查找最优路径,评估路线可用性等。
GDS 通过 Cypher 操作
所有 GDS 的功能 都是通过发出 Cypher® 查询来使用的。因此,它可以通过 Spark 轻松访问,因为 Neo4j Apache Spark 连接器可以发出 Cypher 查询并读取其结果。这种组合意味着您可以将 Neo4j 和 GDS 作为图协处理器用于您可能在 Apache Spark 中实现的现有 ML 工作流。
GDS 一流支持
Neo4j Spark 连接器为图数据科学库提供一流的支持,让我们看看它是如何工作的。
限制
我们不支持 mutate
或 write
过程模式,因为它们不会在数据帧中返回任何可用信息,您可以通过连接数据帧然后使用 Neo4j Spark 连接器将数据写回所需的任何 Neo4j 实例来实现相同的功能。
示例
假设我们要复制此处 GDS 手册中详述的 Page Rank 算法示例。相关的 Spark 示例如下所示(我们假设数据已在 Neo4j 中)。
-
创建投影图:以下 Python 代码将使用原生投影投影图,并将其存储在名为
myGraph
的图目录中。
spark.read.format("org.neo4j.spark.DataSource")
.option("gds", "gds.graph.project")
.option("gds.graphName", "myGraph")
.option("gds.nodeProjection", "Page")
.option("gds.relationshipProjection", "LINKS")
.option("gds.configuration.relationshipProperties", "weight")
.load()
.show(false)
(
spark.read.format("org.neo4j.spark.DataSource")
.option("gds", "gds.graph.project")
.option("gds.graphName", "myGraph")
.option("gds.nodeProjection", "Page")
.option("gds.relationshipProjection", "LINKS")
.option("gds.configuration.relationshipProperties", "weight")
.load()
.show(truncate=False)
)
这将显示如下结果
+------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------+---------+-----------------+-------------+
|nodeProjection |relationshipProjection |graphName|nodeCount|relationshipCount|projectMillis|
+------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------+---------+-----------------+-------------+
|{Page -> {"properties":{},"label":"Page"}}|{LINKS -> {"orientation":"NATURAL","aggregation":"DEFAULT","type":"LINKS","properties":{"weight":{"property":"weight","aggregation":"DEFAULT","defaultValue":null}},"indexInverse":false}}|myGraph |8 |14 |503 |
+------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------+---------+-----------------+-------------+
-
估算算法成本:使用以下 Python 代码,我们将估算使用 estimate 过程运行算法的成本
spark.read.format("org.neo4j.spark.DataSource")
.option("gds", "gds.pageRank.stream.estimate")
.option("gds.graphName", "myGraph")
.option("gds.configuration.concurrency", "2")
.load()
.show(false)
(
spark.read.format("org.neo4j.spark.DataSource")
.option("gds", "gds.pageRank.stream")
.option("gds.graphName", "myGraph")
.option("gds.configuration.concurrency", "2")
.load()
.show(truncate=False)
)
这将显示如下结果
+--------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+--------+---------+-----------------+-----------------+-----------------+
|requiredMemory|treeView |mapView |bytesMin|bytesMax|nodeCount|relationshipCount|heapPercentageMin|heapPercentageMax|
+--------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+--------+---------+-----------------+-----------------+-----------------+
|816 Bytes |Memory Estimation: 816 Bytes\n|-- algorithm: 816 Bytes\n |-- this.instance: 88 Bytes\n |-- vote bits: 104 Bytes\n |-- compute steps: 208 Bytes\n |-- this.instance: 104 Bytes\n |-- node value: 120 Bytes\n |-- pagerank (DOUBLE): 120 Bytes\n |-- message arrays: 296 Bytes\n |-- this.instance: 56 Bytes\n |-- send array: 120 Bytes\n |-- receive array: 120 Bytes\n|{name -> Memory Estimation, components -> [{"name":"algorithm","components":[{"name":"this.instance","memoryUsage":"88 Bytes"},{"name":"vote bits","memoryUsage":"104 Bytes"},{"name":"compute steps","components":[{"name":"this.instance","memoryUsage":"104 Bytes"}],"memoryUsage":"208 Bytes"},{"name":"node value","components":[{"name":"pagerank (DOUBLE)","memoryUsage":"120 Bytes"}],"memoryUsage":"120 Bytes"},{"name":"message arrays","components":[{"name":"this.instance","memoryUsage":"56 Bytes"},{"name":"send array","memoryUsage":"120 Bytes"},{"name":"receive array","memoryUsage":"120 Bytes"}],"memoryUsage":"296 Bytes"}],"memoryUsage":"816 Bytes"}], memoryUsage -> 816 Bytes}|816 |816 |8 |14 |0.1 |0.1 |
+--------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+--------+---------+-----------------+-----------------+-----------------+
-
计算算法:以下 Python 代码将返回 Page Rank 计算结果,而不会修改图
val pr_df = spark.read.format("org.neo4j.spark.DataSource")
.option("gds", "gds.pageRank.stream")
.option("gds.graphName", "myGraph")
.option("gds.configuration.concurrency", "2")
.load()
pr_df.show(false)
# We save the dataframe in the `pr_df` variable as we'll reuse it later
pr_df = (
spark.read.format("org.neo4j.spark.DataSource")
.option("gds", "gds.pageRank.stream")
.option("gds.graphName", "myGraph")
.option("gds.configuration.concurrency", "2")
.load()
)
pr_df.show(truncate=False)
这将显示如下结果
+------+------------------+
|nodeId|score |
+------+------------------+
|0 |3.215681999884452 |
|1 |1.0542700552146722|
|2 |1.0542700552146722|
|3 |1.0542700552146722|
|4 |0.3278578964488539|
|5 |0.3278578964488539|
|6 |0.3278578964488539|
|7 |0.3278578964488539|
+------+------------------+
如您所见,我们现在只有两列 nodeId
和 score
,让我们看看如何使用分数丰富您的节点。
-
使用分数丰富节点:以下 Python 代码将使用分数丰富节点
# we'll assume that `spark` variable is already present
# we create the `nodes_df`
nodes_df = spark.read.format("org.neo4j.spark.DataSource") \
.option("url", "neo4j://127.0.0.1:7687") \
.option("labels", "Page") \
.load()
# we join `nodes_df` with `pr_df` created in the step before
new_df = nodes_df.join(pr_df, nodes_df.col("<id>").equalTo(pr_df.col("nodeId")))
new_df.show(truncate=False)
这将显示如下结果
+----+--------+-------+------+------------------+
|<id>|<labels>| name|nodeId| score|
+----+--------+-------+------+------------------+
| 0| [Page]| Home| 0| 3.215681999884452|
| 1| [Page]| About| 1|1.0542700552146722|
| 2| [Page]|Product| 2|1.0542700552146722|
| 3| [Page]| Links| 3|1.0542700552146722|
| 4| [Page]| Site A| 4|0.3278578964488539|
| 5| [Page]| Site B| 5|0.3278578964488539|
| 6| [Page]| Site C| 6|0.3278578964488539|
| 7| [Page]| Site D| 7|0.3278578964488539|
+----+--------+-------+------+------------------+
现在,您可以将此数据集持久化到所需的任何 Neo4j 实例。
选项
从上面的示例中您可以理解,您可以使用 gds.
前缀和嵌套映射的点表示法支持传递所有必需的选项。
设置名称 | 描述 | 默认值 | 必需 |
---|---|---|---|
GDS 选项 |
|||
|
过程名称。您可以从以下 GDS 手册页面 中选择适合您用例的算法 |
(无) |
是 |
|
设置名称只是一个需要与您选择的过程的输入选项一起完成的前缀。 |
(无) |
是,它与您选择的程序相关 |
如何在 Spark 作业中管理 gds.
前缀
例如,假设您想投影一个图。如下所示
---
CALL gds.graph.project(
'myGraph',
'Page',
'LINKS',
{
relationshipProperties: 'weight'
}
)
---
所以我们需要
-
调用
gds.graph.project
,这导致在我们的 Spark 作业中添加.option("gds", "gds.graph.project")
。project
过程,如您在ink:https://neo4j.ac.cn/docs/graph-data-science/current/management-ops/projections/graph-project/[此处^]中看到的,有 4 个输入参数-
graphName
:我们想将图命名为myGraph
;这导致添加.option("gds.graphName", "myGraph")
-
nodeProjection
:我们想投影Page
节点;这导致添加.option("gds.nodeProjection", "Page")
-
relationshipProjection
:我们想投影LINKS
关系;这导致添加.option("gds.relationshipProjection", "LINKS")
-
configuration
:我们想将weight
配置为定义关系重要性的属性;configuration 是一个映射,我们需要添加一个带有值weight
的relationshipPropertis
键到我们的映射中,我们可以通过点表示法来做到这一点,这导致添加.option("gds.configuration.relationshipProperties", "weight")
-
因此,最终的 Spark 作业将如下所示
# we'll assume that `spark` variable is already present
spark.read.format("org.neo4j.spark.DataSource") \
.option("url", "neo4j://127.0.0.1:7687") \
.option("gds", "gds.graph.project") \
.option("gds.graphName", "myGraph") \
.option("gds.nodeProjection", "Page") \
.option("gds.relationshipProjection", "LINKS") \
.option("gds.configuration.relationshipProperties", "weight") \
.load() \
.show(truncate=False)
通过 Cypher 查询支持 GDS
使用此模式,您可以使用复杂的自定义查询来使用 GDS 分析您的数据。
示例
在 示例 Zeppelin Notebook 存储库 中,有一个可以针对 Neo4j Sandbox 运行的 GDS 示例,展示了如何将两者结合使用。
使用 Spark 在 GDS 中创建虚拟图
这是一个非常简单、直接的代码;它构造了正确的 Cypher 语句以 在 GDS 中创建虚拟图 并返回结果。
%pyspark
query = """
CALL gds.graph.project('got-interactions', 'Person', {
INTERACTS: {
orientation: 'UNDIRECTED'
}
})
YIELD graphName, nodeCount, relationshipCount, projectMillis
RETURN graphName, nodeCount, relationshipCount, projectMillis
"""
df = spark.read.format("org.neo4j.spark.DataSource") \
.option("url", host) \
.option("authentication.type", "basic") \
.option("authentication.basic.username", user) \
.option("authentication.basic.password", password) \
.option("query", query) \
.option("partitions", "1") \
.load()
如果您收到名为 [name] 的图已存在错误,请查看此 常见问题解答。 |
确保选项 partitions
设置为 1。您不希望并行执行此查询,它应该只执行一次。
当您使用存储过程时,必须包含 RETURN
子句。
运行 GDS 分析并将结果回传
以下示例演示了如何运行分析并将结果作为另一个 Cypher 查询获取,作为从 Neo4j 读取的 Spark 读取执行。
%pyspark
query = """
CALL gds.pageRank.stream('got-interactions')
YIELD nodeId, score
RETURN gds.util.asNode(nodeId).name AS name, score
"""
df = spark.read.format("org.neo4j.spark.DataSource") \
.option("url", host) \
.option("authentication.type", "basic") \
.option("authentication.basic.username", user) \
.option("authentication.basic.password", password) \
.option("query", query) \
.option("partitions", "1") \
.load()
df.show()
确保选项 partitions 设置为 1。算法应只执行一次。
|
流式传输与持久化 GDS 结果
当 运行 GDS 算法 时,库会为您提供两种选择:将算法结果回传给调用方,或修改底层图。将 GDS 与 Spark 结合使用提供了转换或以其他方式使用 GDS 结果的额外选项。最终,两种模式都适用于 Neo4j Apache Spark 连接器,您可以选择最适合您用例的模式。
如果您拥有一个在只读副本或单独的独立实例上运行 GDS 算法的架构,那么将结果回传(因为您无法将它们写入只读副本),然后使用连接器的写入功能来获取该结果流,并将它们写回不同的 Neo4j 连接(例如,到常规因果集群)可能会很方便。