使用 Neo4j GDS 进行数据科学
Neo4j 图数据科学 (GDS) 让数据科学家能够利用强大的图算法。它提供了无监督机器学习 (ML) 方法和启发式算法,用于学习和描述图的拓扑结构。GDS 包含经过强化的图算法,具有企业级功能,例如用于一致结果和可重复 ML 工作流的确定性种子。
GDS 算法分为五组:
-
社区检测:用于检测组集群和分区选项。
-
中心性:用于计算图中节点的重要性。
-
拓扑链接预测:用于估计节点形成关系的可能性。
-
相似性:用于评估节点对的相似性。
-
路径查找与搜索:用于查找最佳路径、评估路线可用性等。
GDS 通过 Cypher 运行
GDS 的所有功能 都通过发出 Cypher® 查询来使用。因此,它可以通过 Spark 轻松访问,因为 Neo4j Connector for Apache Spark 可以发出 Cypher 查询并读取其结果。这种组合意味着您可以将 Neo4j 和 GDS 作为图协处理器用于您可能在 Apache Spark 中实现的现有 ML 工作流。
GDS 一流支持
Neo4j Spark Connector 为图数据科学库提供一流支持,让我们看看它如何工作。
限制
我们不支持 mutate
或 write
过程模式,因为它们不会在数据帧中返回任何可用的信息。您可以通过连接数据帧,然后使用 Neo4j Spark 连接器将数据写回您想要的任何 Neo4j 实例来达到相同的效果。
示例
假设我们想重现 GDS 手册中此处详细描述的 Page Rank 算法示例。相关的 Spark 示例如下所示(我们假设数据已在 Neo4j 中)。
-
创建投影图:以下 Python 代码将使用原生投影投影一个图,并将其存储在图目录中,名称为
myGraph
。
spark.read.format("org.neo4j.spark.DataSource")
.option("gds", "gds.graph.project")
.option("gds.graphName", "myGraph")
.option("gds.nodeProjection", "Page")
.option("gds.relationshipProjection", "LINKS")
.option("gds.configuration.relationshipProperties", "weight")
.load()
.show(false)
(
spark.read.format("org.neo4j.spark.DataSource")
.option("gds", "gds.graph.project")
.option("gds.graphName", "myGraph")
.option("gds.nodeProjection", "Page")
.option("gds.relationshipProjection", "LINKS")
.option("gds.configuration.relationshipProperties", "weight")
.load()
.show(truncate=False)
)
这将显示如下结果:
+------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------+---------+-----------------+-------------+
|nodeProjection |relationshipProjection |graphName|nodeCount|relationshipCount|projectMillis|
+------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------+---------+-----------------+-------------+
|{Page -> {"properties":{},"label":"Page"}}|{LINKS -> {"orientation":"NATURAL","aggregation":"DEFAULT","type":"LINKS","properties":{"weight":{"property":"weight","aggregation":"DEFAULT","defaultValue":null}},"indexInverse":false}}|myGraph |8 |14 |503 |
+------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------+---------+-----------------+-------------+
-
估算算法成本:使用以下 Python 代码,我们将使用估算过程估算运行算法的成本:
spark.read.format("org.neo4j.spark.DataSource")
.option("gds", "gds.pageRank.stream.estimate")
.option("gds.graphName", "myGraph")
.option("gds.configuration.concurrency", "2")
.load()
.show(false)
(
spark.read.format("org.neo4j.spark.DataSource")
.option("gds", "gds.pageRank.stream")
.option("gds.graphName", "myGraph")
.option("gds.configuration.concurrency", "2")
.load()
.show(truncate=False)
)
这将显示如下结果:
+--------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+--------+---------+-----------------+-----------------+-----------------+
|requiredMemory|treeView |mapView |bytesMin|bytesMax|nodeCount|relationshipCount|heapPercentageMin|heapPercentageMax|
+--------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+--------+---------+-----------------+-----------------+-----------------+
|816 Bytes |Memory Estimation: 816 Bytes\n|-- algorithm: 816 Bytes\n |-- this.instance: 88 Bytes\n |-- vote bits: 104 Bytes\n |-- compute steps: 208 Bytes\n |-- this.instance: 104 Bytes\n |-- node value: 120 Bytes\n |-- pagerank (DOUBLE): 120 Bytes\n |-- message arrays: 296 Bytes\n |-- this.instance: 56 Bytes\n |-- send array: 120 Bytes\n |-- receive array: 120 Bytes\n|{name -> Memory Estimation, components -> [{"name":"algorithm","components":[{"name":"this.instance","memoryUsage":"88 Bytes"},{"name":"vote bits","memoryUsage":"104 Bytes"},{"name":"compute steps","components":[{"name":"this.instance","memoryUsage":"104 Bytes"}],"memoryUsage":"208 Bytes"},{"name":"node value","components":[{"name":"pagerank (DOUBLE)","memoryUsage":"120 Bytes"}],"memoryUsage":"120 Bytes"},{"name":"message arrays","components":[{"name":"this.instance","memoryUsage":"56 Bytes"},{"name":"send array","memoryUsage":"120 Bytes"},{"name":"receive array","memoryUsage":"120 Bytes"}],"memoryUsage":"296 Bytes"}],"memoryUsage":"816 Bytes"}], memoryUsage -> 816 Bytes}|816 |816 |8 |14 |0.1 |0.1 |
+--------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+--------+---------+-----------------+-----------------+-----------------+
-
计算算法:以下 Python 代码将返回 Page Rank 计算结果,而不修改图:
val pr_df = spark.read.format("org.neo4j.spark.DataSource")
.option("gds", "gds.pageRank.stream")
.option("gds.graphName", "myGraph")
.option("gds.configuration.concurrency", "2")
.load()
pr_df.show(false)
# We save the dataframe in the `pr_df` variable as we'll reuse it later
pr_df = (
spark.read.format("org.neo4j.spark.DataSource")
.option("gds", "gds.pageRank.stream")
.option("gds.graphName", "myGraph")
.option("gds.configuration.concurrency", "2")
.load()
)
pr_df.show(truncate=False)
这将显示如下结果:
+------+------------------+
|nodeId|score |
+------+------------------+
|0 |3.215681999884452 |
|1 |1.0542700552146722|
|2 |1.0542700552146722|
|3 |1.0542700552146722|
|4 |0.3278578964488539|
|5 |0.3278578964488539|
|6 |0.3278578964488539|
|7 |0.3278578964488539|
+------+------------------+
如您所见,我们现在只有 nodeId
和 score
两列,让我们看看如何用分数丰富您的节点。
-
用分数丰富节点:以下 Python 代码将用分数丰富节点:
# we'll assume that `spark` variable is already present
# we create the `nodes_df`
nodes_df = spark.read.format("org.neo4j.spark.DataSource") \
.option("url", "neo4j://localhost:7687") \
.option("labels", "Page") \
.load()
# we join `nodes_df` with `pr_df` created in the step before
new_df = nodes_df.join(pr_df, nodes_df.col("<id>").equalTo(pr_df.col("nodeId")))
new_df.show(truncate=False)
这将显示如下结果:
+----+--------+-------+------+------------------+
|<id>|<labels>| name|nodeId| score|
+----+--------+-------+------+------------------+
| 0| [Page]| Home| 0| 3.215681999884452|
| 1| [Page]| About| 1|1.0542700552146722|
| 2| [Page]|Product| 2|1.0542700552146722|
| 3| [Page]| Links| 3|1.0542700552146722|
| 4| [Page]| Site A| 4|0.3278578964488539|
| 5| [Page]| Site B| 5|0.3278578964488539|
| 6| [Page]| Site C| 6|0.3278578964488539|
| 7| [Page]| Site D| 7|0.3278578964488539|
+----+--------+-------+------+------------------+
现在您可以将此数据集持久化到您想要的任何 Neo4j 实例。
选项
正如您从上面的示例中可能理解的,您可以使用 gds.
前缀和点表示法来支持嵌套映射,从而传递所有必需的选项。
设置名称 | 描述 | 默认值 | 必需 |
---|---|---|---|
GDS 选项 |
|||
|
过程名称。您可以从 GDS 手册中以下页面选择适合您用例的算法 |
(无) |
是 |
|
设置名称只是一个前缀,需要与您选择的过程的输入选项一起完整。 |
(无) |
是,它与您选择的过程相关 |
如何在 Spark Job 中管理 gds.
前缀
例如,假设您想要投影一个图,如下所示:
---
CALL gds.graph.project(
'myGraph',
'Page',
'LINKS',
{
relationshipProperties: 'weight'
}
)
---
所以我们需要:
-
调用
gds.graph.project
,这将导致在我们的 Spark Job 中添加.option("gds", "gds.graph.project")
。project
过程,如您在 ink:https://neo4j.ac.cn/docs/graph-data-science/current/management-ops/projections/graph-project/[此处^] 所见,有 4 个输入参数:-
graphName
:我们想将图命名为myGraph
;这导致添加.option("gds.graphName", "myGraph")
-
nodeProjection
:我们想投影Page
节点;这导致添加.option("gds.nodeProjection", "Page")
-
relationshipProjection
:我们想投影LINKS
关系;这导致添加.option("gds.relationshipProjection", "LINKS")
-
configuration
:我们想将weight
配置为定义关系重要性的属性;configuration
是一个映射,我们需要添加一个relationshipProperties
键,其值为weight
到我们的映射中。我们可以通过点表示法实现这一点,这导致添加.option("gds.configuration.relationshipProperties", "weight")
-
所以最终的 Spark 作业将如下所示:
# we'll assume that `spark` variable is already present
spark.read.format("org.neo4j.spark.DataSource") \
.option("url", "neo4j://localhost:7687") \
.option("gds", "gds.graph.project") \
.option("gds.graphName", "myGraph") \
.option("gds.nodeProjection", "Page") \
.option("gds.relationshipProjection", "LINKS") \
.option("gds.configuration.relationshipProperties", "weight") \
.load() \
.show(truncate=False)
通过 Cypher 查询支持 GDS
在此模式下,您可以使用复杂的自定义查询,以便通过 GDS 分析数据。
示例
在示例 Zeppelin Notebook 存储库中,有一个 GDS 示例可以针对 Neo4j Sandbox 运行,演示如何将两者结合使用。
使用 Spark 在 GDS 中创建虚拟图
这是一个非常简单、直接的代码;它构建了正确的 Cypher 语句,用于在 GDS 中创建虚拟图并返回结果。
%pyspark
query = """
CALL gds.graph.project('got-interactions', 'Person', {
INTERACTS: {
orientation: 'UNDIRECTED'
}
})
YIELD graphName, nodeCount, relationshipCount, projectMillis
RETURN graphName, nodeCount, relationshipCount, projectMillis
"""
df = spark.read.format("org.neo4j.spark.DataSource") \
.option("url", host) \
.option("authentication.type", "basic") \
.option("authentication.basic.username", user) \
.option("authentication.basic.password", password) \
.option("query", query) \
.option("partitions", "1") \
.load()
如果您收到“A graph with name [name] already exists”(名称为 [name] 的图已存在)错误,请查看此常见问题。 |
确保选项 partitions
设置为 1。您不希望并行执行此查询,它应该只执行一次。
当您使用存储过程时,必须包含 RETURN
子句。
运行 GDS 分析并流式传输结果
以下示例展示了如何运行分析并将结果作为另一个 Cypher 查询获取,该查询作为 Spark 从 Neo4j 读取的操作执行。
%pyspark
query = """
CALL gds.pageRank.stream('got-interactions')
YIELD nodeId, score
RETURN gds.util.asNode(nodeId).name AS name, score
"""
df = spark.read.format("org.neo4j.spark.DataSource") \
.option("url", host) \
.option("authentication.type", "basic") \
.option("authentication.basic.username", user) \
.option("authentication.basic.password", password) \
.option("query", query) \
.option("partitions", "1") \
.load()
df.show()
确保选项 partitions 设置为 1。算法应该只执行一次。
|
GDS 结果的流式传输与持久化
在运行 GDS 算法时,库提供了将算法结果流式传输回调用者或修改底层图的选项。将 GDS 与 Spark 结合使用提供了转换或以其他方式使用 GDS 结果的额外选项。最终,两种模式都适用于 Neo4j Connector for Apache Spark,您可以根据自己的用例选择最佳方案。
如果您的架构中 GDS 算法正在只读副本或单独的独立实例上运行,那么流式传输结果可能很方便(因为您无法将它们写入只读副本),然后使用连接器的写入功能获取该结果流,并将其写入不同的 Neo4j 连接,即写入常规的因果集群。