使用 Neo4j GDS 进行数据科学

Neo4j 图数据科学 (GDS) 让数据科学家能够利用强大的图算法。它提供了无监督机器学习 (ML) 方法和启发式算法,用于学习和描述图的拓扑结构。GDS 包含经过强化的图算法,具有企业级功能,例如用于一致结果和可重复 ML 工作流的确定性种子。

GDS 算法分为五组:

  • 社区检测:用于检测组集群和分区选项。

  • 中心性:用于计算图中节点的重要性。

  • 拓扑链接预测:用于估计节点形成关系的可能性。

  • 相似性:用于评估节点对的相似性。

  • 路径查找与搜索:用于查找最佳路径、评估路线可用性等。

GDS 通过 Cypher 运行

GDS 的所有功能 都通过发出 Cypher® 查询来使用。因此,它可以通过 Spark 轻松访问,因为 Neo4j Connector for Apache Spark 可以发出 Cypher 查询并读取其结果。这种组合意味着您可以将 Neo4j 和 GDS 作为图协处理器用于您可能在 Apache Spark 中实现的现有 ML 工作流。

GDS 一流支持

Neo4j Spark Connector 为图数据科学库提供一流支持,让我们看看它如何工作。

限制

我们不支持 mutatewrite 过程模式,因为它们不会在数据帧中返回任何可用的信息。您可以通过连接数据帧,然后使用 Neo4j Spark 连接器将数据写回您想要的任何 Neo4j 实例来达到相同的效果。

示例

假设我们想重现 GDS 手册中此处详细描述的 Page Rank 算法示例。相关的 Spark 示例如下所示(我们假设数据已在 Neo4j 中)。

  1. 创建投影图:以下 Python 代码将使用原生投影投影一个图,并将其存储在图目录中,名称为 myGraph

spark.read.format("org.neo4j.spark.DataSource")
  .option("gds", "gds.graph.project")
  .option("gds.graphName", "myGraph")
  .option("gds.nodeProjection", "Page")
  .option("gds.relationshipProjection", "LINKS")
  .option("gds.configuration.relationshipProperties", "weight")
  .load()
  .show(false)
(
  spark.read.format("org.neo4j.spark.DataSource")
  .option("gds", "gds.graph.project")
  .option("gds.graphName", "myGraph")
  .option("gds.nodeProjection", "Page")
  .option("gds.relationshipProjection", "LINKS")
  .option("gds.configuration.relationshipProperties", "weight")
  .load()
  .show(truncate=False)
)

这将显示如下结果:

+------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------+---------+-----------------+-------------+
|nodeProjection                            |relationshipProjection                                                                                                                                                                    |graphName|nodeCount|relationshipCount|projectMillis|
+------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------+---------+-----------------+-------------+
|{Page -> {"properties":{},"label":"Page"}}|{LINKS -> {"orientation":"NATURAL","aggregation":"DEFAULT","type":"LINKS","properties":{"weight":{"property":"weight","aggregation":"DEFAULT","defaultValue":null}},"indexInverse":false}}|myGraph  |8        |14               |503          |
+------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------+---------+-----------------+-------------+
  1. 估算算法成本:使用以下 Python 代码,我们将使用估算过程估算运行算法的成本:

spark.read.format("org.neo4j.spark.DataSource")
  .option("gds", "gds.pageRank.stream.estimate")
  .option("gds.graphName", "myGraph")
  .option("gds.configuration.concurrency", "2")
  .load()
  .show(false)
(
  spark.read.format("org.neo4j.spark.DataSource")
  .option("gds", "gds.pageRank.stream")
  .option("gds.graphName", "myGraph")
  .option("gds.configuration.concurrency", "2")
  .load()
  .show(truncate=False)
)

这将显示如下结果:

+--------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+--------+---------+-----------------+-----------------+-----------------+
|requiredMemory|treeView                                                                                                                                                                                                                                                                                                                                                                                                                 |mapView                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                 |bytesMin|bytesMax|nodeCount|relationshipCount|heapPercentageMin|heapPercentageMax|
+--------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+--------+---------+-----------------+-----------------+-----------------+
|816 Bytes     |Memory Estimation: 816 Bytes\n|-- algorithm: 816 Bytes\n    |-- this.instance: 88 Bytes\n    |-- vote bits: 104 Bytes\n    |-- compute steps: 208 Bytes\n        |-- this.instance: 104 Bytes\n    |-- node value: 120 Bytes\n        |-- pagerank (DOUBLE): 120 Bytes\n    |-- message arrays: 296 Bytes\n        |-- this.instance: 56 Bytes\n        |-- send array: 120 Bytes\n        |-- receive array: 120 Bytes\n|{name -> Memory Estimation, components -> [{"name":"algorithm","components":[{"name":"this.instance","memoryUsage":"88 Bytes"},{"name":"vote bits","memoryUsage":"104 Bytes"},{"name":"compute steps","components":[{"name":"this.instance","memoryUsage":"104 Bytes"}],"memoryUsage":"208 Bytes"},{"name":"node value","components":[{"name":"pagerank (DOUBLE)","memoryUsage":"120 Bytes"}],"memoryUsage":"120 Bytes"},{"name":"message arrays","components":[{"name":"this.instance","memoryUsage":"56 Bytes"},{"name":"send array","memoryUsage":"120 Bytes"},{"name":"receive array","memoryUsage":"120 Bytes"}],"memoryUsage":"296 Bytes"}],"memoryUsage":"816 Bytes"}], memoryUsage -> 816 Bytes}|816     |816     |8        |14               |0.1              |0.1              |
+--------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+--------+---------+-----------------+-----------------+-----------------+
  1. 计算算法:以下 Python 代码将返回 Page Rank 计算结果,而不修改图:

val pr_df = spark.read.format("org.neo4j.spark.DataSource")
  .option("gds", "gds.pageRank.stream")
  .option("gds.graphName", "myGraph")
  .option("gds.configuration.concurrency", "2")
  .load()

pr_df.show(false)
# We save the dataframe in the `pr_df` variable as we'll reuse it later
pr_df = (
  spark.read.format("org.neo4j.spark.DataSource")
  .option("gds", "gds.pageRank.stream")
  .option("gds.graphName", "myGraph")
  .option("gds.configuration.concurrency", "2")
  .load()
)

pr_df.show(truncate=False)

这将显示如下结果:

+------+------------------+
|nodeId|score             |
+------+------------------+
|0     |3.215681999884452 |
|1     |1.0542700552146722|
|2     |1.0542700552146722|
|3     |1.0542700552146722|
|4     |0.3278578964488539|
|5     |0.3278578964488539|
|6     |0.3278578964488539|
|7     |0.3278578964488539|
+------+------------------+

如您所见,我们现在只有 nodeIdscore 两列,让我们看看如何用分数丰富您的节点。

  1. 用分数丰富节点:以下 Python 代码将用分数丰富节点:

    # we'll assume that `spark` variable is already present
    # we create the `nodes_df`
    nodes_df = spark.read.format("org.neo4j.spark.DataSource") \
      .option("url", "neo4j://localhost:7687") \
      .option("labels", "Page") \
      .load()

    # we join `nodes_df` with `pr_df` created in the step before
    new_df = nodes_df.join(pr_df, nodes_df.col("<id>").equalTo(pr_df.col("nodeId")))
    new_df.show(truncate=False)

这将显示如下结果:

+----+--------+-------+------+------------------+
|<id>|<labels>|   name|nodeId|             score|
+----+--------+-------+------+------------------+
|   0|  [Page]|   Home|     0| 3.215681999884452|
|   1|  [Page]|  About|     1|1.0542700552146722|
|   2|  [Page]|Product|     2|1.0542700552146722|
|   3|  [Page]|  Links|     3|1.0542700552146722|
|   4|  [Page]| Site A|     4|0.3278578964488539|
|   5|  [Page]| Site B|     5|0.3278578964488539|
|   6|  [Page]| Site C|     6|0.3278578964488539|
|   7|  [Page]| Site D|     7|0.3278578964488539|
+----+--------+-------+------+------------------+

现在您可以将此数据集持久化到您想要的任何 Neo4j 实例。

选项

正如您从上面的示例中可能理解的,您可以使用 gds. 前缀和点表示法来支持嵌套映射,从而传递所有必需的选项。

表 1. 可用配置设置列表
设置名称 描述 默认值 必需

GDS 选项

gds

过程名称。您可以从 GDS 手册中以下页面选择适合您用例的算法

(无)

gds.

设置名称只是一个前缀,需要与您选择的过程的输入选项一起完整。

(无)

是,它与您选择的过程相关

如何在 Spark Job 中管理 gds. 前缀

例如,假设您想要投影一个图,如下所示:

---
CALL gds.graph.project(
  'myGraph',
  'Page',
  'LINKS',
  {
    relationshipProperties: 'weight'
  }
)
---

所以我们需要:

  • 调用 gds.graph.project,这将导致在我们的 Spark Job 中添加 .option("gds", "gds.graph.project")project 过程,如您在 ink:https://neo4j.ac.cn/docs/graph-data-science/current/management-ops/projections/graph-project/[此处^] 所见,有 4 个输入参数:

    • graphName:我们想将图命名为 myGraph;这导致添加 .option("gds.graphName", "myGraph")

    • nodeProjection:我们想投影 Page 节点;这导致添加 .option("gds.nodeProjection", "Page")

    • relationshipProjection:我们想投影 LINKS 关系;这导致添加 .option("gds.relationshipProjection", "LINKS")

    • configuration:我们想将 weight 配置为定义关系重要性的属性;configuration 是一个映射,我们需要添加一个 relationshipProperties 键,其值为 weight 到我们的映射中。我们可以通过点表示法实现这一点,这导致添加 .option("gds.configuration.relationshipProperties", "weight")

所以最终的 Spark 作业将如下所示:

    # we'll assume that `spark` variable is already present
    spark.read.format("org.neo4j.spark.DataSource") \
      .option("url", "neo4j://localhost:7687") \
      .option("gds", "gds.graph.project") \
      .option("gds.graphName", "myGraph") \
      .option("gds.nodeProjection", "Page") \
      .option("gds.relationshipProjection", "LINKS") \
      .option("gds.configuration.relationshipProperties", "weight") \
      .load() \
      .show(truncate=False)

通过 Cypher 查询支持 GDS

在此模式下,您可以使用复杂的自定义查询,以便通过 GDS 分析数据。

示例

示例 Zeppelin Notebook 存储库中,有一个 GDS 示例可以针对 Neo4j Sandbox 运行,演示如何将两者结合使用。

使用 Spark 在 GDS 中创建虚拟图

这是一个非常简单、直接的代码;它构建了正确的 Cypher 语句,用于在 GDS 中创建虚拟图并返回结果。

%pyspark
query = """
    CALL gds.graph.project('got-interactions', 'Person', {
      INTERACTS: {
        orientation: 'UNDIRECTED'
      }
    })
    YIELD graphName, nodeCount, relationshipCount, projectMillis
    RETURN graphName, nodeCount, relationshipCount, projectMillis
"""

df = spark.read.format("org.neo4j.spark.DataSource") \
    .option("url", host) \
    .option("authentication.type", "basic") \
    .option("authentication.basic.username", user) \
    .option("authentication.basic.password", password) \
    .option("query", query) \
    .option("partitions", "1") \
    .load()
如果您收到“A graph with name [name] already exists”(名称为 [name] 的图已存在)错误,请查看此常见问题

确保选项 partitions 设置为 1。您不希望并行执行此查询,它应该只执行一次。

当您使用存储过程时,必须包含 RETURN 子句。

运行 GDS 分析并流式传输结果

以下示例展示了如何运行分析并将结果作为另一个 Cypher 查询获取,该查询作为 Spark 从 Neo4j 读取的操作执行。

%pyspark

query = """
    CALL gds.pageRank.stream('got-interactions')
    YIELD nodeId, score
    RETURN gds.util.asNode(nodeId).name AS name, score
"""

df = spark.read.format("org.neo4j.spark.DataSource") \
    .option("url", host) \
    .option("authentication.type", "basic") \
    .option("authentication.basic.username", user) \
    .option("authentication.basic.password", password) \
    .option("query", query) \
    .option("partitions", "1") \
    .load()

df.show()
确保选项 partitions 设置为 1。算法应该只执行一次。

GDS 结果的流式传输与持久化

运行 GDS 算法时,库提供了将算法结果流式传输回调用者或修改底层图的选项。将 GDS 与 Spark 结合使用提供了转换或以其他方式使用 GDS 结果的额外选项。最终,两种模式都适用于 Neo4j Connector for Apache Spark,您可以根据自己的用例选择最佳方案。

如果您的架构中 GDS 算法正在只读副本或单独的独立实例上运行,那么流式传输结果可能很方便(因为您无法将它们写入只读副本),然后使用连接器的写入功能获取该结果流,并将其写入不同的 Neo4j 连接,即写入常规的因果集群。

© . All rights reserved.