横向扩展作业
Snowflake 专用 Neo4j 图分析旨在运行于 Snowpark Container Services,这允许您将作业横向扩展到多个计算节点。这对于以下两种场景尤其有用:
-
多个用户同时使用应用程序。
-
单个用户同时运行多个作业。
这两种场景都由应用程序以相同的方式处理。算法在计算池中的一个计算节点上执行。计算池根据传递给算法的选择器进行选择。Snowflake 根据池中运行的作业数量自动扩展或缩减计算池。
多个用户同时使用应用程序
多个用户可以在同一个计算池或不同的计算池上并行运行作业。应用程序会自动处理作业在计算节点上的分布。
下图显示了在不同计算池上执行多个作业的示例。计算池 CPU_X64_M
和 GPU_NV_S
配置为最多使用两个计算节点,而 CPU_X64_L
配置为最多使用一个计算节点。

根据上图,一个可能的多用户场景可能如下所示:
Alice> CALL graph.wcc('CPU_X64_M', <configuration>); -- Job 1
Bob> CALL graph.page_rank('CPU_X64_M', <configuration>); -- Job 2
Charlie> CALL graph.triangle_count('GPU_X64_L', <configuration>); -- Job 3
David> CALL graph.louvain('CPU_X64_L', <configuration>); -- Job 4
Eve> CALL graph.gs_nc_predict('GPU_NV_S', <configuration>); -- Job 5
Frank> CALL graph.gs_nc_train('GPU_NV_S', <configuration>); -- Job 6
作业 1 和作业 2 都安排在 CPU_X64_M
计算池上。由于此池有两个计算节点,它们可以同时运行。作业 3 和作业 4 安排在 GPU_X64_L
计算池上。由于只有一个计算节点的限制,作业 4 必须等待作业 3 完成。作业 5 和作业 6 安排在 GPU_NV_S
计算池上,可以并行运行。
单个用户同时运行多个作业
与多用户场景类似,单个用户可以在同一个计算池或不同的计算池上并行运行多个作业。
一般来说,上面的例子也适用于单个用户:
Alice> CALL graph.wcc('CPU_X64_M', <configuration>); -- Job 1
Alice> CALL graph.page_rank('CPU_X64_M', <configuration>); -- Job 2
Alice> CALL graph.triangle_count('GPU_X64_L', <configuration>); -- Job 3
Alice> CALL graph.louvain('CPU_X64_L', <configuration>); -- Job 4
Alice> CALL graph.gs_nc_predict('GPU_NV_S', <configuration>); -- Job 5
Alice> CALL graph.gs_nc_train('GPU_NV_S', <configuration>); -- Job 6
作为单个用户并行执行多个作业可以通过多种方式实现。我们将给出一个在 Snowpark Python 工作表中演示的示例。其他并行运行多个作业的方式也是可能的,例如,使用 Snowflake CLI 或 Snowflake Python 连接器。
示例
在以下示例中,我们将使用 Snowpark Python API 并行运行多个作业。关键概念是使用 collect_nowait() 调用来运行作业并立即返回一个 future 对象。该示例可以适应自定义消费者表并在 Snowpark Python 工作表中执行。我们将在 CPU_X64_M
计算池上并行运行三个作业。每个作业都会将其结果写入一个单独的输出表中。
import snowflake.snowpark as snowpark
from json import loads
def main(session: snowpark.Session):
"""
This example shows how to run multiple Neo4j Graph Analytics jobs in parallel.
The code can be run in a Snowflake worksheet.
The return type of the worksheet must be set to `VARIANT`.
"""
# The name of the application.
app_name = 'Neo4j_Graph_Analytics'
# The qualified name of the schema where the consumer data is stored.
consumer_schema = 'consumer_db.data_schema'
# The compute pool to execute the jobs.
compute_pool_selector = 'CPU_X64_M'
# The minimum and maximum number of compute nodes in the compute pool.
min_nodes = max_nodes = 3
# Make sure the compute pool is able to scale out compute jobs.
# Note, that this command requires `app_admin` role.
session.sql(f"CALL {app_name}.admin.set_max_nodes('{compute_pool_selector}', {max_nodes})").collect()
session.sql(f"CALL {app_name}.admin.set_min_nodes('{compute_pool_selector}', {min_nodes})").collect()
# Configure table to graph projection.
# We assume two node tables and one relationship table.
# * consumer_db.data_schema.Nodes_A
# * consumer_db.data_schema.Nodes_B
# * consumer_db.data_schema.Relationships
project_cfg = {
'nodeTables': ['Nodes_A', 'Nodes_B'],
'relationshipTables': {
'Relationships': {
'sourceTable': 'Nodes_A',
'targetTable': 'Nodes_B'
}
}
}
# Prepare procedure calls for three Neo4j Graph Analytics algorithms.
wcc = f"""
CALL {app_name}.graph.wcc('{compute_pool_selector}',
{{
'defaultTablePrefix': {consumer_schema},
'project': {project_cfg},
'compute': {{}},
'write': [{{
'nodeLabel': 'Nodes_A',
'outputTable': 'nodes_A_components'
}}]
}})"""
page_rank = f"""
CALL {app_name}.graph.page_rank('{compute_pool_selector}',
{{
'defaultTablePrefix': {consumer_schema},
'project': {project_cfg},
'compute': {{ 'dampingFactor': 0.87 }},
'write': [{{
'nodeLabel': 'Nodes_B',
'outputTable': 'nodes_B_ranks'
}}]
}})"""
node_similarity = f"""
CALL {app_name}.graph.node_similarity('{compute_pool_selector}',
{{
'defaultTablePrefix': {consumer_schema},
'project': {project_cfg},
'compute': {{ 'topK': 5 }},
'write': [{{
'sourceLabel': 'Nodes_A',
'targetLabel': 'Nodes_A',
'outputTable': 'nodes_A_similarities'
}}]
}})"""
# Start three jobs in parallel.
# `collect_nowait` will return immediately, but the jobs will continue to run.
wcc_result = session.sql(wcc).collect_nowait()
page_rank_result = session.sql(page_rank).collect_nowait()
node_similarity_result = session.sql(node_similarity).collect_nowait()
# Wait for the jobs to finish and collect the results.
wcc_result = loads(wcc_result.result()[0]["JOB_RESULT"])["wcc_1"]
page_rank_result = loads(page_rank_result.result()[0]["JOB_RESULT"])["page_rank_1"]
node_similarity_result = loads(node_similarity_result.result()[0]["JOB_RESULT"])["node_similarity_1"]
# Return meta data about the three jobs.
# Note, that each job also produces a result table.
return {
'wcc_components': wcc_result["componentCount"],
'page_rank_iterations': page_rank_result["ranIterations"],
'node_sim_comparisons': node_similarity_result["nodesCompared"]
}