Aura 图分析无服务器

Aura 图分析无服务器是一个按需的临时计算环境,用于运行 GDS 工作负载。每个计算单元称为一个 GDS 会话。它是 Neo4j Aura 的一部分,Neo4j Aura 是一个快速、可扩展、始终在线、全自动的云图平台。

GDS 会话有三种类型

  • 附加:数据源是 Neo4j AuraDB 实例。

  • 自管理:数据源是自管理的 Neo4j DBMS。

  • 独立:数据源不基于 Neo4j。

将会话填充数据的过程称为远程投影。一旦填充,GDS 会话就可以运行 GDS 工作负载,例如算法和机器学习模型。这些计算的结果可以使用附加和自管理类型中的远程回写写回原始源。

有关现成的 Notebook,请参阅我们关于 GDS 会话的教程,适用于AuraDB自管理数据库任何其他数据源

1. GDS 会话管理

GdsSessions 对象是以下操作的 API 入口点

  • get_or_create:创建新的 GDS 会话,或连接到现有会话。

  • list:列出所有当前活动的 GDS 会话。

  • delete:删除 GDS 会话。

您需要 Neo4j Aura API 凭据(CLIENT_IDCLIENT_SECRET)来创建 GdsSessions 对象。有关如何从 Neo4j Aura 帐户创建 API 凭据的说明,请参阅 Aura 文档。如果您的 Aura 用户属于多个项目,则还必须提供所需的项目 ID。

创建 GdsSessions 对象
from graphdatascience.session import GdsSessions, AuraAPICredentials

CLIENT_ID = "my-aura-api-client-id"
CLIENT_SECRET = "my-aura-api-client-secret"
PROJECT_ID = None

# Create a new GdsSessions object
sessions = GdsSessions(api_credentials=AuraAPICredentials(CLIENT_ID, CLIENT_SECRET, PROJECT_ID))

所有可用方法和参数都列在API 参考中。

1.1. 创建 GDS 会话

要创建 GDS 会话,请使用 get_or_create() 方法。如果会话不存在,它将创建一个新会话;如果会话存在,它将连接到现有会话。如果会话选项与现有会话不同,则会抛出错误。

get_or_create() 的返回值是 AuraGraphDataScience 对象。它提供了与 GraphDataScience 对象类似的 API,但它被配置为在 GDS 会话上运行。按照惯例,始终将 gds 用作 get_or_create() 返回值的变量名。

1.1.1. 会话过期和删除

创建会话时,可以配置可选的 ttl 参数来设置不活动会话过期的时间。ttl 的默认值为 1 小时,允许的最大值为 7 天。过期会话不能用于运行工作负载,不产生任何费用,并将在 7 天后自动删除。它也可以通过 Aura Console UI 删除。

1.1.2. 最大生命周期

会话的活动时间不能超过 7 天。即使会话不会因不活动而过期,它仍将在创建 7 天后过期。这是一个硬性限制,无法更改。

1.1.3. 语法

sessions.get_or_create(
    session_name: str,
    memory: SessionMemory,
    db_connection: Optional[DbmsConnectionInfo] = None,
    ttl: Optional[timedelta] = None,
    cloud_location: Optional[CloudLocation] = None,
    timeout: Optional[int] = None,
): AuraGraphDataScience
表 1. 参数
名称 类型 可选 默认 描述

session_name

str

-

会话名称。在项目内必须唯一。

memory

SessionMemory

-

会话可用的内存量。支持的值

db_connection

DbmsConnectionInfo

Neo4j DBMS 的 Bolt 服务器 URL、用户名和密码。附加和自管理类型必需。

ttl

datetime.timedelta

1h

会话的生存时间。

cloud_location

CloudLocation

Aura 支持的云提供商和 GDS 会话将运行的区域。自管理和独立类型必需。

timeout

int

等待会话进入就绪状态的秒数。如果超时,将返回错误。

1.1.4. 示例

创建附加到 AuraDB 实例的 GDS 会话
from graphdatascience.session import DbmsConnectionInfo, SessionMemory

gds = sessions.get_or_create(
    session_name="my-attached-session",
    memory=SessionMemory.m_4GB,
    db_connection=DbmsConnectionInfo(
        "neo4j+s://mydbid.databases.neo4j.io",
        "my-user",
        "my-password"
    ),
)
为自管理 Neo4j DBMS 创建 GDS 会话
from graphdatascience.session import DbmsConnectionInfo, CloudLocation, SessionMemory

gds = sessions.get_or_create(
    session_name="my-self-managed-session",
    memory=SessionMemory.m_4GB,
    db_connection=DbmsConnectionInfo("neo4j://localhost", "my-user", "my-password"),
    cloud_location=CloudLocation(provider="gcp", region="europe-west1"),
)
创建没有任何 Neo4j 数据库的 GDS 会话
from graphdatascience.session import CloudLocation, SessionMemory

gds = sessions.get_or_create(
    session_name="my-standalone-session",
    memory=SessionMemory.m_4GB,
    cloud_location=CloudLocation(provider="gcp", region="europe-west1"),
)

1.2. 列出 GDS 会话

list() 方法返回所有当前活动的 GDS 会话的名称和内存大小。

列出 GDS 会话
sessions.list()

1.3. 删除 GDS 会话

删除 GDS 会话将终止会话并停止任何正在累积的成本。删除会话不会影响已配置的 Neo4j 数据源。但是,任何未回写到 Neo4j 实例的数据都将丢失。

如果您有开放的会话连接

通过开放客户端连接删除 GDS 会话
gds.delete()

使用 delete() 方法删除 GDS 会话。

通过 GdsSessions 对象删除 GDS 会话
sessions.delete(session_name="my-new-session")

1.4. 估算会话内存

为了帮助确定给定工作负载的合适会话大小,有 estimate() 函数。通过提供预期的节点和关系计数以及应使用的算法类别,它将返回会话的估计大小。

通过 GdsSessions 对象估算 GDS 会话的大小
from graphdatascience.session import AlgorithmCategory

memory = sessions.estimate(
    node_count=20,
    relationship_count=50,
    algorithm_categories=[AlgorithmCategory.CENTRALITY, AlgorithmCategory.NODE_EMBEDDING],
)

2. 将图投影到 GDS 会话中

拥有 GDS 会话后,您可以将图投影到其中。此操作称为远程投影,因为数据源不是与数据库共同位于一处的数据库,而是远程数据库。

您可以使用 gds.graph.project() 端点创建一个远程投影,该端点带有图名称、Cypher 查询和附加可选参数。Cypher 查询必须包含 gds.graph.project.remote() 函数才能将图投影到 GDS 会话中。这只能通过附加和自管理会话完成。独立会话必须使用 graph.construct

2.1. 语法

远程投影
gds.graph.project(
    graph_name: str,
    query: str,
    job_id: Optional[str] = None,
    concurrency: int = 4,
    undirected_relationship_types: Optional[list[str]] = None,
    inverse_indexed_relationship_types: Optional[list[str]] = None,
    batch_size: Optional[int] = None,
): (Graph, Series[Any])
表 2. 参数
名称 类型 可选 默认 描述

graph_name

str

-

图的名称。

query

str

-

投影查询。

job_id

str

会话上进程的关联 ID。如果未提供,将使用自动生成的 ID。

concurrency

int

4

在会话内构建图时使用的并发性。

undirected_relationship_types

list[str]

[]

应视为无向的关系类型名称列表。

inverse_indexed_relationship_types

list[str]

[]

应反向索引的关系类型名称列表。

batch_size

int

10000

从 DBMS 传输到会话的批次大小。

表 3. 结果
名称 类型 描述

graph

Graph

表示投影图的图对象。

result

Series[Any]

关于投影的统计数据。

concurrencybatch_size 配置参数可用于调整远程投影的性能。

远程投影查询的并发性由 DBMS 服务器上的 Cypher 运行时控制。使用 CYPHER runtime=parallel 作为查询前缀以最大化性能。实际使用的并发性取决于 DBMS 服务器的可用处理器和当前操作负载。

2.1.1. 远程投影查询语法

远程投影查询支持与 Cypher 投影相同的语法,但有两个主要区别

  1. 图名称不是参数。相反,图名称提供给 gds.graph.project() 端点。

  2. 必须使用 gds.graph.project.remote() 函数,而不是 gds.graph.project() 函数。

有关如何编写 Cypher 投影查询的完整详细信息和示例,请参阅 GDS 手册中的 Cypher 投影文档

2.1.2. 关系类型无向性和反向索引

可选参数 undirectedRelationshipTypesinverseIndexedRelationshipTypes 用于配置关系的无向性和反向索引。它们的行为与 GDS 手册中记录的行为相同。

2.2. 示例

此示例展示了如何将图投影到 GDS 会话中。示例图是异构的,并建模了用户和产品。用户可以相互认识,用户可以购买产品。

附加和自管理示例使用 Cypher 查询来填充数据库数据。独立示例则使用 pandas DataFrames。

在 Neo4j DBMS 中创建一些数据并将其投影到附加的 GDS 会话
import os # for reading environment variables
from graphdatascience.session import SessionMemory, DbmsConnectionInfo, GdsSessions, AuraAPICredentials

sessions = GdsSessions(api_credentials=AuraAPICredentials(os.environ["CLIENT_ID"], os.environ["CLIENT_SECRET"]))

db_connection = DbmsConnectionInfo(os.environ["DB_URI"], os.environ["DB_USER"], os.environ["DB_PASSWORD"])
gds = sessions.get_or_create(
    session_name="my-new-session",
    memory=SessionMemory.m_8GB,
    db_connection=db_connection,
)

gds.run_cypher(
    """
    CREATE
     (u1:User {name: 'Mats'}),
     (u2:User {name: 'Florentin'}),
     (p1:Product {name: 'ice cream', cost: 4.2}),
     (p2:Product {name: 'computer', cost: 13.37})

    CREATE
     (u1)-[:KNOWS {since: 2020}]->(u2),
     (u2)-[:BOUGHT {price: 7474}]->(p1),
     (u1)-[:BOUGHT {price: 1337}]->(p2)
    """
)

G, result = gds.graph.project(
    graph_name="my-graph",
    query="""
    CALL {
        MATCH (u1:User)
        OPTIONAL MATCH (u1)-[r:KNOWS]->(u2:User)
        RETURN u1 AS source, r AS rel, u2 AS target, {} AS sourceNodeProperties, {} AS targetNodeProperties
        UNION
        MATCH (p:Product)
        OPTIONAL MATCH (p)<-[r:BOUGHT]-(user:User)
        RETURN user AS source, r AS rel, p AS target, {} AS sourceNodeProperties, {cost: p.cost} AS targetNodeProperties
    }
    RETURN gds.graph.project.remote(source, target, {
      sourceNodeProperties: sourceNodeProperties,
      targetNodeProperties: targetNodeProperties,
      sourceNodeLabels: labels(source),
      targetNodeLabels: labels(target),
      relationshipType: type(rel),
      relationshipProperties: properties(rel)
    })
    """,
)
在 Neo4j DBMS 中创建一些数据并将其投影到自管理的 GDS 会话
import os # for reading environment variables
from graphdatascience.session import SessionMemory, DbmsConnectionInfo, GdsSessions, AuraAPICredentials, CloudLocation

sessions = GdsSessions(api_credentials=AuraAPICredentials(os.environ["CLIENT_ID"], os.environ["CLIENT_SECRET"]))

db_connection = DbmsConnectionInfo(os.environ["DB_URI"], os.environ["DB_USER"], os.environ["DB_PASSWORD"])
gds = sessions.get_or_create(
    session_name="my-new-session",
    memory=SessionMemory.m_8GB,
    db_connection=db_connection,
    cloud_location=CloudLocation(provider="gcp", region="europe-west1"),
)

gds.run_cypher(
    """
    CREATE
     (u1:User {name: 'Mats'}),
     (u2:User {name: 'Florentin'}),
     (p1:Product {name: 'ice cream', cost: 4.2}),
     (p2:Product {name: 'computer', cost: 13.37})

    CREATE
     (u1)-[:KNOWS {since: 2020}]->(u2),
     (u2)-[:BOUGHT {price: 7474}]->(p1),
     (u1)-[:BOUGHT {price: 1337}]->(p2)
    """
)

G, result = gds.graph.project(
    graph_name="my-graph",
    query="""
    CALL {
        MATCH (u1:User)
        OPTIONAL MATCH (u1)-[r:KNOWS]->(u2:User)
        RETURN u1 AS source, r AS rel, u2 AS target, {} AS sourceNodeProperties, {} AS targetNodeProperties
        UNION
        MATCH (p:Product)
        OPTIONAL MATCH (p)<-[r:BOUGHT]-(user:User)
        RETURN user AS source, r AS rel, p AS target, {} AS sourceNodeProperties, {cost: p.cost} AS targetNodeProperties
    }
    RETURN gds.graph.project.remote(source, target, {
      sourceNodeProperties: sourceNodeProperties,
      targetNodeProperties: targetNodeProperties,
      sourceNodeLabels: labels(source),
      targetNodeLabels: labels(target),
      relationshipType: type(rel),
      relationshipProperties: properties(rel)
    })
    """,
)
将一些数据投影到独立的 GDS 会话
from graphdatascience.session import CloudLocation, SessionMemory

gds = sessions.get_or_create(
    session_name="my-standalone-session",
    memory=SessionMemory.m_4GB,
    cloud_location=CloudLocation(provider="gcp", region="europe-west1"),
)

nodes = [pandas.DataFrame({
        "nodeId": [0, 1],
        "labels":  ["Person", "Person"],
    }), pandas.DataFrame({
        "nodeId": [2, 3],
        "labels":  ["Product", "Product"],
        "cost": [4.2, 13.37],
    })
]

relationships = [pandas.DataFrame({
        "sourceNodeId": [0],
        "targetNodeId": [1],
        "relationshipType": ["KNOWS"],
        "since": [2020]
    }), pandas.DataFrame({
        "sourceNodeId": [0, 1],
        "targetNodeId": [3, 2],
        "relationshipType": ["BOUGHT", "BOUGHT"],
        "price": [1337, 7474]
    })
]

G = gds.graph.construct(
    "my-graph",
    nodes,
    relationships
)

3. 运行算法

您可以在远程投影图上运行算法,方式与在任何投影图上运行算法相同。例如,您可以在上一示例的投影图上运行 PageRank 和 FastRP 算法,如下所示

运行算法并流式传输结果
gds.pageRank.mutate(G, mutateProperty="pr")
gds.fastRP.mutate(G, featureProperties=["pr"], embeddingDimension=2, nodeSelfInfluence=0.1, mutateProperty="embedding")

# Stream the results back together with the `name` property fetched from the database
gds.graph.nodeProperties.stream(G, db_node_properties=["name"], node_properties=["pr", "embedding"])

有关可用算法的完整列表,请参阅API 参考

3.1. 限制

  • 模型目录受限制支持

    • 训练好的模型只能用于在其训练的同一会话中进行预测。会话删除后,所有训练好的模型都将丢失。

    • 不支持模型发布,包括

      • gds.model.publish

    • 不支持模型持久化,包括

      • gds.model.store

      • gds.model.load

      • gds.model.delete

  • 不支持拓扑链接预测算法,包括

    • gds.alpha.linkprediction.adamicAdar

    • gds.alpha.linkprediction.commonNeighbors

    • gds.alpha.linkprediction.preferentialAttachment

    • gds.alpha.linkprediction.resourceAllocation

    • gds.alpha.linkprediction.sameCommunity

    • gds.alpha.linkprediction.totalNeighbors

4. 远程回写

将 GDS 会话中完成的计算结果持久化,其方式因会话类型而异。附加和自管理会话内置支持将算法结果写回图投影的同一 Neo4j 数据库。独立会话的用户必须将结果流回客户端,并且用户必须将其持久化到其目标系统中。本节将说明内置的远程回写功能。

默认情况下,回写将并发发生,每个批次一个事务。该行为由三个方面控制

  • 数据集的大小(例如,节点计数或关系计数)

  • 配置的批次大小

  • 配置的并发性

4.1. 语法

附加和自管理会话的远程回写语法是相同的。

远程图回写
gds.graph.<operation>.write(
    graph_name: str,
    # additional parameters,
    **config: Any,
): Series[Any]
远程算法回写
gds.<algo>.write(
    graph_name: str,
    **config: Any,
): Series[Any]

所有回写端点都支持以下附加配置

表 4. 参数
名称 可选 默认 描述

concurrency

dynamic [1]

用于回写到 DBMS 的并发性。

arrowConfiguration

-

包含从 DBMS 到 GDS Arrow Server 的连接的附加配置的字典。

1. DBMS 服务器处理器数量的两倍

表 5. Arrow 配置
名称 可选 默认 描述

batchSize

10000

DBMS 从会话中检索的批次大小。

4.2. 示例

扩展前面的示例,我们可以将 FastRP 嵌入写回 Neo4j 数据库,如下所示

将修改后的 FastRP 嵌入写回数据库
gds.graph.nodeProperties.write(G, "embedding")

如果我们要调整回写的性能,可以配置 batchSizeconcurrency。在此示例中,我们展示了如何使用算法 .write 模式来完成此操作

计算 WCC 并将组件 ID 作为节点属性写回,并带有自定义并发配置
gds.wcc.write(
  G,
  writeProperty="wcc",
  concurrency=12,
  arrowConfiguration={"batchSize": 25000}
)

5. 查询数据库

您可以使用 run_cypher() 方法对 Neo4j 数据库运行 Cypher 查询。对可以运行的查询类型没有限制,但重要的是要注意查询将在 Neo4j 数据库上运行,而不是在 GDS 会话上运行。

如果您想使用 Cypher 操作图分析无服务器,请使用 Cypher API
运行 Cypher 查询以查找我们回写的嵌入
gds.run_cypher("MATCH (n:User) RETURN n.name, n.embedding")