横向扩展作业

Snowflake 专用 Neo4j 图分析旨在运行于 Snowpark Container Services,这允许您将作业横向扩展到多个计算节点。这对于以下两种场景尤其有用:

  1. 多个用户同时使用应用程序。

  2. 单个用户同时运行多个作业。

这两种场景都由应用程序以相同的方式处理。算法在计算池中的一个计算节点上执行。计算池根据传递给算法的选择器进行选择。Snowflake 根据池中运行的作业数量自动扩展或缩减计算池。

注意事项

最小的工作单元是作业,即一个项目-计算-写入的执行。应用程序不会自动将作业拆分为更小的工作单元并将其分配到多个计算节点。根据图大小和算法,请考虑为您的作业选择正确的计算池选择器

管理计算池中所述,计算池配置有最小和最大计算节点数。当算法执行时,Snowflake 会自动将作业放置在正在运行的节点上,或在必要时启动新节点。请考虑根据您的需求调整计算池中的最小和最大节点数。有关使用计算池的更多信息,请查阅 Snowflake 文档。

多个用户同时使用应用程序

多个用户可以在同一个计算池或不同的计算池上并行运行作业。应用程序会自动处理作业在计算节点上的分布。

下图显示了在不同计算池上执行多个作业的示例。计算池 CPU_X64_MGPU_NV_S 配置为最多使用两个计算节点,而 CPU_X64_L 配置为最多使用一个计算节点。

Visualization of the example graph

根据上图,一个可能的多用户场景可能如下所示:

  Alice> CALL graph.wcc('CPU_X64_M', <configuration>);            -- Job 1
    Bob> CALL graph.page_rank('CPU_X64_M', <configuration>);      -- Job 2
Charlie> CALL graph.triangle_count('GPU_X64_L', <configuration>); -- Job 3
  David> CALL graph.louvain('CPU_X64_L', <configuration>);        -- Job 4
    Eve> CALL graph.gs_nc_predict('GPU_NV_S', <configuration>);   -- Job 5
  Frank> CALL graph.gs_nc_train('GPU_NV_S', <configuration>);     -- Job 6

作业 1 和作业 2 都安排在 CPU_X64_M 计算池上。由于此池有两个计算节点,它们可以同时运行。作业 3 和作业 4 安排在 GPU_X64_L 计算池上。由于只有一个计算节点的限制,作业 4 必须等待作业 3 完成。作业 5 和作业 6 安排在 GPU_NV_S 计算池上,可以并行运行。

单个用户同时运行多个作业

与多用户场景类似,单个用户可以在同一个计算池或不同的计算池上并行运行多个作业。

一般来说,上面的例子也适用于单个用户:

Alice> CALL graph.wcc('CPU_X64_M', <configuration>);            -- Job 1
Alice> CALL graph.page_rank('CPU_X64_M', <configuration>);      -- Job 2
Alice> CALL graph.triangle_count('GPU_X64_L', <configuration>); -- Job 3
Alice> CALL graph.louvain('CPU_X64_L', <configuration>);        -- Job 4
Alice> CALL graph.gs_nc_predict('GPU_NV_S', <configuration>);   -- Job 5
Alice> CALL graph.gs_nc_train('GPU_NV_S', <configuration>);     -- Job 6

作为单个用户并行执行多个作业可以通过多种方式实现。我们将给出一个在 Snowpark Python 工作表中演示的示例。其他并行运行多个作业的方式也是可能的,例如,使用 Snowflake CLISnowflake Python 连接器

示例

在以下示例中,我们将使用 Snowpark Python API 并行运行多个作业。关键概念是使用 collect_nowait() 调用来运行作业并立即返回一个 future 对象。该示例可以适应自定义消费者表并在 Snowpark Python 工作表中执行。我们将在 CPU_X64_M 计算池上并行运行三个作业。每个作业都会将其结果写入一个单独的输出表中。

import snowflake.snowpark as snowpark
from json import loads

def main(session: snowpark.Session):
    """
    This example shows how to run multiple Neo4j Graph Analytics jobs in parallel.
    The code can be run in a Snowflake worksheet.
    The return type of the worksheet must be set to `VARIANT`.
    """
    # The name of the application.
    app_name = 'Neo4j_Graph_Analytics'
    # The qualified name of the schema where the consumer data is stored.
    consumer_schema = 'consumer_db.data_schema'
    # The compute pool to execute the jobs.
    compute_pool_selector = 'CPU_X64_M'
    # The minimum and maximum number of compute nodes in the compute pool.
    min_nodes = max_nodes = 3

    # Make sure the compute pool is able to scale out compute jobs.
    # Note, that this command requires `app_admin` role.
    session.sql(f"CALL {app_name}.admin.set_max_nodes('{compute_pool_selector}', {max_nodes})").collect()
    session.sql(f"CALL {app_name}.admin.set_min_nodes('{compute_pool_selector}', {min_nodes})").collect()

    # Configure table to graph projection.
    # We assume two node tables and one relationship table.
    # * consumer_db.data_schema.Nodes_A
    # * consumer_db.data_schema.Nodes_B
    # * consumer_db.data_schema.Relationships
    project_cfg = {
        'nodeTables': ['Nodes_A', 'Nodes_B'],
        'relationshipTables': {
            'Relationships': {
                'sourceTable': 'Nodes_A',
                'targetTable': 'Nodes_B'
            }
        }
    }

    # Prepare procedure calls for three Neo4j Graph Analytics algorithms.
    wcc = f"""
        CALL {app_name}.graph.wcc('{compute_pool_selector}',
        {{
            'defaultTablePrefix': {consumer_schema},
            'project': {project_cfg},
            'compute': {{}},
            'write': [{{
                'nodeLabel': 'Nodes_A',
                'outputTable': 'nodes_A_components'
            }}]
        }})"""
    page_rank = f"""
        CALL {app_name}.graph.page_rank('{compute_pool_selector}',
        {{
            'defaultTablePrefix': {consumer_schema},
            'project': {project_cfg},
            'compute': {{ 'dampingFactor': 0.87 }},
            'write': [{{
                'nodeLabel': 'Nodes_B',
                'outputTable': 'nodes_B_ranks'
            }}]
        }})"""
    node_similarity = f"""
        CALL {app_name}.graph.node_similarity('{compute_pool_selector}',
        {{
            'defaultTablePrefix': {consumer_schema},
            'project': {project_cfg},
            'compute': {{ 'topK': 5 }},
            'write': [{{
                'sourceLabel': 'Nodes_A',
                'targetLabel': 'Nodes_A',
                'outputTable': 'nodes_A_similarities'
            }}]
        }})"""

    # Start three jobs in parallel.
    # `collect_nowait` will return immediately, but the jobs will continue to run.
    wcc_result = session.sql(wcc).collect_nowait()
    page_rank_result = session.sql(page_rank).collect_nowait()
    node_similarity_result = session.sql(node_similarity).collect_nowait()

    # Wait for the jobs to finish and collect the results.
    wcc_result = loads(wcc_result.result()[0]["JOB_RESULT"])["wcc_1"]
    page_rank_result = loads(page_rank_result.result()[0]["JOB_RESULT"])["page_rank_1"]
    node_similarity_result = loads(node_similarity_result.result()[0]["JOB_RESULT"])["node_similarity_1"]

    # Return meta data about the three jobs.
    # Note, that each job also produces a result table.
    return {
        'wcc_components': wcc_result["componentCount"],
        'page_rank_iterations': page_rank_result["ranIterations"],
        'node_sim_comparisons': node_similarity_result["nodesCompared"]
    }
© . All rights reserved.