使用 Neo4j GDS 的数据科学

Neo4j 图数据科学 (GDS) 使数据科学家能够利用强大的图算法。它提供无监督机器学习 (ML) 方法和启发式方法,用于学习和描述图的拓扑结构。GDS 包括具有企业功能的强化图算法,例如确定性种子以获得一致的结果和可重现的 ML 工作流。

GDS 算法分为五组

  • 社区检测,检测组集群和分区选项。

  • 中心性,帮助计算图中节点的重要性。

  • 拓扑链接预测,估计节点形成关系的可能性。

  • 相似性,评估节点对的相似性。

  • 路径查找和搜索,查找最优路径,评估路线可用性等。

GDS 通过 Cypher 操作

所有 GDS 的功能 都是通过发出 Cypher® 查询来使用的。因此,它可以通过 Spark 轻松访问,因为 Neo4j Apache Spark 连接器可以发出 Cypher 查询并读取其结果。这种组合意味着您可以将 Neo4j 和 GDS 作为图协处理器用于您可能在 Apache Spark 中实现的现有 ML 工作流。

GDS 一流支持

Neo4j Spark 连接器为图数据科学库提供一流的支持,让我们看看它是如何工作的。

限制

我们不支持 mutatewrite 过程模式,因为它们不会在数据帧中返回任何可用信息,您可以通过连接数据帧然后使用 Neo4j Spark 连接器将数据写回所需的任何 Neo4j 实例来实现相同的功能。

示例

假设我们要复制此处 GDS 手册中详述的 Page Rank 算法示例。相关的 Spark 示例如下所示(我们假设数据已在 Neo4j 中)。

  1. 创建投影图:以下 Python 代码将使用原生投影投影图,并将其存储在名为 myGraph 的图目录中。

spark.read.format("org.neo4j.spark.DataSource")
  .option("gds", "gds.graph.project")
  .option("gds.graphName", "myGraph")
  .option("gds.nodeProjection", "Page")
  .option("gds.relationshipProjection", "LINKS")
  .option("gds.configuration.relationshipProperties", "weight")
  .load()
  .show(false)
(
  spark.read.format("org.neo4j.spark.DataSource")
  .option("gds", "gds.graph.project")
  .option("gds.graphName", "myGraph")
  .option("gds.nodeProjection", "Page")
  .option("gds.relationshipProjection", "LINKS")
  .option("gds.configuration.relationshipProperties", "weight")
  .load()
  .show(truncate=False)
)

这将显示如下结果

+------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------+---------+-----------------+-------------+
|nodeProjection                            |relationshipProjection                                                                                                                                                                    |graphName|nodeCount|relationshipCount|projectMillis|
+------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------+---------+-----------------+-------------+
|{Page -> {"properties":{},"label":"Page"}}|{LINKS -> {"orientation":"NATURAL","aggregation":"DEFAULT","type":"LINKS","properties":{"weight":{"property":"weight","aggregation":"DEFAULT","defaultValue":null}},"indexInverse":false}}|myGraph  |8        |14               |503          |
+------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------+---------+-----------------+-------------+
  1. 估算算法成本:使用以下 Python 代码,我们将估算使用 estimate 过程运行算法的成本

spark.read.format("org.neo4j.spark.DataSource")
  .option("gds", "gds.pageRank.stream.estimate")
  .option("gds.graphName", "myGraph")
  .option("gds.configuration.concurrency", "2")
  .load()
  .show(false)
(
  spark.read.format("org.neo4j.spark.DataSource")
  .option("gds", "gds.pageRank.stream")
  .option("gds.graphName", "myGraph")
  .option("gds.configuration.concurrency", "2")
  .load()
  .show(truncate=False)
)

这将显示如下结果

+--------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+--------+---------+-----------------+-----------------+-----------------+
|requiredMemory|treeView                                                                                                                                                                                                                                                                                                                                                                                                                 |mapView                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                 |bytesMin|bytesMax|nodeCount|relationshipCount|heapPercentageMin|heapPercentageMax|
+--------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+--------+---------+-----------------+-----------------+-----------------+
|816 Bytes     |Memory Estimation: 816 Bytes\n|-- algorithm: 816 Bytes\n    |-- this.instance: 88 Bytes\n    |-- vote bits: 104 Bytes\n    |-- compute steps: 208 Bytes\n        |-- this.instance: 104 Bytes\n    |-- node value: 120 Bytes\n        |-- pagerank (DOUBLE): 120 Bytes\n    |-- message arrays: 296 Bytes\n        |-- this.instance: 56 Bytes\n        |-- send array: 120 Bytes\n        |-- receive array: 120 Bytes\n|{name -> Memory Estimation, components -> [{"name":"algorithm","components":[{"name":"this.instance","memoryUsage":"88 Bytes"},{"name":"vote bits","memoryUsage":"104 Bytes"},{"name":"compute steps","components":[{"name":"this.instance","memoryUsage":"104 Bytes"}],"memoryUsage":"208 Bytes"},{"name":"node value","components":[{"name":"pagerank (DOUBLE)","memoryUsage":"120 Bytes"}],"memoryUsage":"120 Bytes"},{"name":"message arrays","components":[{"name":"this.instance","memoryUsage":"56 Bytes"},{"name":"send array","memoryUsage":"120 Bytes"},{"name":"receive array","memoryUsage":"120 Bytes"}],"memoryUsage":"296 Bytes"}],"memoryUsage":"816 Bytes"}], memoryUsage -> 816 Bytes}|816     |816     |8        |14               |0.1              |0.1              |
+--------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+--------+---------+-----------------+-----------------+-----------------+
  1. 计算算法:以下 Python 代码将返回 Page Rank 计算结果,而不会修改图

val pr_df = spark.read.format("org.neo4j.spark.DataSource")
  .option("gds", "gds.pageRank.stream")
  .option("gds.graphName", "myGraph")
  .option("gds.configuration.concurrency", "2")
  .load()

pr_df.show(false)
# We save the dataframe in the `pr_df` variable as we'll reuse it later
pr_df = (
  spark.read.format("org.neo4j.spark.DataSource")
  .option("gds", "gds.pageRank.stream")
  .option("gds.graphName", "myGraph")
  .option("gds.configuration.concurrency", "2")
  .load()
)

pr_df.show(truncate=False)

这将显示如下结果

+------+------------------+
|nodeId|score             |
+------+------------------+
|0     |3.215681999884452 |
|1     |1.0542700552146722|
|2     |1.0542700552146722|
|3     |1.0542700552146722|
|4     |0.3278578964488539|
|5     |0.3278578964488539|
|6     |0.3278578964488539|
|7     |0.3278578964488539|
+------+------------------+

如您所见,我们现在只有两列 nodeIdscore,让我们看看如何使用分数丰富您的节点。

  1. 使用分数丰富节点:以下 Python 代码将使用分数丰富节点

    # we'll assume that `spark` variable is already present
    # we create the `nodes_df`
    nodes_df = spark.read.format("org.neo4j.spark.DataSource") \
      .option("url", "neo4j://127.0.0.1:7687") \
      .option("labels", "Page") \
      .load()

    # we join `nodes_df` with `pr_df` created in the step before
    new_df = nodes_df.join(pr_df, nodes_df.col("<id>").equalTo(pr_df.col("nodeId")))
    new_df.show(truncate=False)

这将显示如下结果

+----+--------+-------+------+------------------+
|<id>|<labels>|   name|nodeId|             score|
+----+--------+-------+------+------------------+
|   0|  [Page]|   Home|     0| 3.215681999884452|
|   1|  [Page]|  About|     1|1.0542700552146722|
|   2|  [Page]|Product|     2|1.0542700552146722|
|   3|  [Page]|  Links|     3|1.0542700552146722|
|   4|  [Page]| Site A|     4|0.3278578964488539|
|   5|  [Page]| Site B|     5|0.3278578964488539|
|   6|  [Page]| Site C|     6|0.3278578964488539|
|   7|  [Page]| Site D|     7|0.3278578964488539|
+----+--------+-------+------+------------------+

现在,您可以将此数据集持久化到所需的任何 Neo4j 实例。

选项

从上面的示例中您可以理解,您可以使用 gds. 前缀和嵌套映射的点表示法支持传递所有必需的选项。

表 1. 可用配置设置列表
设置名称 描述 默认值 必需

GDS 选项

gds

过程名称。您可以从以下 GDS 手册页面 中选择适合您用例的算法

(无)

gds.

设置名称只是一个需要与您选择的过程的输入选项一起完成的前缀。

(无)

是,它与您选择的程序相关

如何在 Spark 作业中管理 gds. 前缀

例如,假设您想投影一个图。如下所示

---
CALL gds.graph.project(
  'myGraph',
  'Page',
  'LINKS',
  {
    relationshipProperties: 'weight'
  }
)
---

所以我们需要

  • 调用 gds.graph.project,这导致在我们的 Spark 作业中添加 .option("gds", "gds.graph.project")project 过程,如您在ink:https://neo4j.ac.cn/docs/graph-data-science/current/management-ops/projections/graph-project/[此处^]中看到的,有 4 个输入参数

    • graphName:我们想将图命名为 myGraph;这导致添加 .option("gds.graphName", "myGraph")

    • nodeProjection:我们想投影 Page 节点;这导致添加 .option("gds.nodeProjection", "Page")

    • relationshipProjection:我们想投影 LINKS 关系;这导致添加 .option("gds.relationshipProjection", "LINKS")

    • configuration:我们想将 weight 配置为定义关系重要性的属性;configuration 是一个映射,我们需要添加一个带有值 weightrelationshipPropertis 键到我们的映射中,我们可以通过点表示法来做到这一点,这导致添加 .option("gds.configuration.relationshipProperties", "weight")

因此,最终的 Spark 作业将如下所示

    # we'll assume that `spark` variable is already present
    spark.read.format("org.neo4j.spark.DataSource") \
      .option("url", "neo4j://127.0.0.1:7687") \
      .option("gds", "gds.graph.project") \
      .option("gds.graphName", "myGraph") \
      .option("gds.nodeProjection", "Page") \
      .option("gds.relationshipProjection", "LINKS") \
      .option("gds.configuration.relationshipProperties", "weight") \
      .load() \
      .show(truncate=False)

通过 Cypher 查询支持 GDS

使用此模式,您可以使用复杂的自定义查询来使用 GDS 分析您的数据。

示例

示例 Zeppelin Notebook 存储库 中,有一个可以针对 Neo4j Sandbox 运行的 GDS 示例,展示了如何将两者结合使用。

使用 Spark 在 GDS 中创建虚拟图

这是一个非常简单、直接的代码;它构造了正确的 Cypher 语句以 在 GDS 中创建虚拟图 并返回结果。

%pyspark
query = """
    CALL gds.graph.project('got-interactions', 'Person', {
      INTERACTS: {
        orientation: 'UNDIRECTED'
      }
    })
    YIELD graphName, nodeCount, relationshipCount, projectMillis
    RETURN graphName, nodeCount, relationshipCount, projectMillis
"""

df = spark.read.format("org.neo4j.spark.DataSource") \
    .option("url", host) \
    .option("authentication.type", "basic") \
    .option("authentication.basic.username", user) \
    .option("authentication.basic.password", password) \
    .option("query", query) \
    .option("partitions", "1") \
    .load()
如果您收到名为 [name] 的图已存在错误,请查看此 常见问题解答

确保选项 partitions 设置为 1。您不希望并行执行此查询,它应该只执行一次。

当您使用存储过程时,必须包含 RETURN 子句。

运行 GDS 分析并将结果回传

以下示例演示了如何运行分析并将结果作为另一个 Cypher 查询获取,作为从 Neo4j 读取的 Spark 读取执行。

%pyspark

query = """
    CALL gds.pageRank.stream('got-interactions')
    YIELD nodeId, score
    RETURN gds.util.asNode(nodeId).name AS name, score
"""

df = spark.read.format("org.neo4j.spark.DataSource") \
    .option("url", host) \
    .option("authentication.type", "basic") \
    .option("authentication.basic.username", user) \
    .option("authentication.basic.password", password) \
    .option("query", query) \
    .option("partitions", "1") \
    .load()

df.show()
确保选项 partitions 设置为 1。算法应只执行一次。

流式传输与持久化 GDS 结果

运行 GDS 算法 时,库会为您提供两种选择:将算法结果回传给调用方,或修改底层图。将 GDS 与 Spark 结合使用提供了转换或以其他方式使用 GDS 结果的额外选项。最终,两种模式都适用于 Neo4j Apache Spark 连接器,您可以选择最适合您用例的模式。

如果您拥有一个在只读副本或单独的独立实例上运行 GDS 算法的架构,那么将结果回传(因为您无法将它们写入只读副本),然后使用连接器的写入功能来获取该结果流,并将它们写回不同的 Neo4j 连接(例如,到常规因果集群)可能会很方便。