监控正在运行的算法的进度

Colab Google Colab 中使用笔记本跟随操作

在大型图上运行算法在计算上可能很昂贵。此示例展示了如何使用 gds.beta.listProgress 过程来监控算法的进度,这既可以了解处理速度,也可以确定计算何时完成。

设置

有关如何开始使用 Python 的更多信息,请参阅 使用 Python 连接 教程。

pip install graphdatascience
# Import the client
from graphdatascience import GraphDataScience

# Replace with the actual URI, username, and password
AURA_CONNECTION_URI = "neo4j+s://xxxxxxxx.databases.neo4j.io"
AURA_USERNAME = "neo4j"
AURA_PASSWORD = ""

# Configure the client with AuraDS-recommended settings
gds = GraphDataScience(
    AURA_CONNECTION_URI,
    auth=(AURA_USERNAME, AURA_PASSWORD),
    aura_ds=True
)

在以下代码示例中,我们使用 print 函数来打印 Pandas DataFrameSeries 对象。您可以尝试不同的方法来打印 Pandas 对象,例如通过 to_stringto_json 方法;如果您使用 JSON 表示,在某些情况下您可能需要包含一个 默认处理程序 来处理 Neo4j DateTime 对象。检查 Python 连接 部分以获取一些示例。

有关如何开始使用 Cypher Shell 的更多信息,请参阅 Neo4j Cypher Shell 教程。

从安装 Cypher shell 的目录运行以下命令。
export AURA_CONNECTION_URI="neo4j+s://xxxxxxxx.databases.neo4j.io"
export AURA_USERNAME="neo4j"
export AURA_PASSWORD=""

./cypher-shell -a $AURA_CONNECTION_URI -u $AURA_USERNAME -p $AURA_PASSWORD

有关如何开始使用 Python 的更多信息,请参阅 使用 Python 连接 教程。

pip install neo4j
# Import the driver
from neo4j import GraphDatabase

# Replace with the actual URI, username, and password
AURA_CONNECTION_URI = "neo4j+s://xxxxxxxx.databases.neo4j.io"
AURA_USERNAME = "neo4j"
AURA_PASSWORD = ""

# Instantiate the driver
driver = GraphDatabase.driver(
    AURA_CONNECTION_URI,
    auth=(AURA_USERNAME, AURA_PASSWORD)
)
# Import to prettify results
import json

# Import for the JSON helper function
from neo4j.time import DateTime

# Helper function for serializing Neo4j DateTime in JSON dumps
def default(o):
    if isinstance(o, (DateTime)):
        return o.isoformat()

创建示例图

通过 GDS 图生成 算法,可以轻松创建内存中的图。通过指定节点数量、每个节点平均发出的关系数量以及关系分布函数,该算法将创建具有以下形状的图

(:1000000_Nodes)-[:REL]→(:1000000_Nodes)

# Run the graph generation algorithm and retrieve the corresponding
# graph object and call result metadata
g, result = gds.beta.graph.generate(
    "example-graph",
    1000000,
    3,
    relationshipDistribution="POWER_LAW"
)

# Print prettified graph stats
print(result)
CALL gds.beta.graph.generate(
  'example-graph',
  1000000,
  3,
  {relationshipDistribution: 'POWER_LAW'}
)
YIELD name,
  nodes,
  relationships,
  generateMillis,
  relationshipSeed,
  averageDegree,
  relationshipDistribution,
  relationshipProperty
RETURN *
# Cypher query
create_example_graph_query = """
    CALL gds.beta.graph.generate(
      'example-graph',
      1000000,
      3,
      {relationshipDistribution: 'POWER_LAW'}
    )
    YIELD name,
      nodes,
      relationships,
      generateMillis,
      relationshipSeed,
      averageDegree,
      relationshipDistribution,
      relationshipProperty
    RETURN *
"""

# Create the driver session
with driver.session() as session:
    # Run query
    result = session.run(create_example_graph_query).data()

    # Prettify the result
    print(json.dumps(result, indent=2, sort_keys=True, default=default))

运行算法并检查进度

我们需要运行一个需要一些时间才能收敛的算法。在本示例中,我们使用标签传播算法,该算法在单独的线程中启动,以便我们可以在同一 Python 进程中检查其进度。

# Import to run the long-running algorithm in a thread
import threading
# Import to use the sleep method
import time


# Method to call the label propagation algorithm from a thread
def run_label_prop():
    print("Running label propagation")

    result = gds.labelPropagation.mutate(
        g,
        mutateProperty="communityID"
    )

    print(result)


# Method to get and pretty-print the algorithm progress
def run_list_progress():
    result = gds.beta.listProgress()

    print(result)


# Create a thread for the label propagation algorithm and start it
label_prop_query_thread = threading.Thread(target=run_label_prop)
label_prop_query_thread.start()

# Sleep for a few seconds so the label propagation query has time to get going
print('Sleeping for 5 seconds')
time.sleep(5)

# Check the algorithm progress
run_list_progress()

# Sleep for a few more seconds
print('Sleeping for 10 more seconds')
time.sleep(10)

# Check the algorithm progress again
run_list_progress()

# Block and wait for the algorithm thread to finish
label_prop_query_thread.join()
CALL gds.labelPropagation.mutate(
  'example-graph',
  {mutateProperty: 'communityID'}
)
YIELD preProcessingMillis,
  computeMillis,
  mutateMillis,
  postProcessingMillis,
  nodePropertiesWritten,
  communityCount,
  ranIterations,
  didConverge,
  communityDistribution,
  configuration
RETURN *

// The following query has to be run in another Cypher shell, so run this command
// in a different terminal first:
//
// ./cypher-shell -a $AURA_CONNECTION_URI -u $AURA_USERNAME -p $AURA_PASSWORD

CALL gds.beta.listProgress()
YIELD jobId, taskName, progress, progressBar
RETURN *
# Import to run the long-running algorithm in a thread
import threading
# Import to use the sleep method
import time


# Method to call the label propagation algorithm from a thread
def run_label_prop():
    label_prop_mutate_example_graph_query = """
        CALL gds.labelPropagation.mutate(
          'example-graph',
          {mutateProperty: 'communityID'}
        )
        YIELD preProcessingMillis,
          computeMillis,
          mutateMillis,
          postProcessingMillis,
          nodePropertiesWritten,
          communityCount,
          ranIterations,
          didConverge,
          communityDistribution,
          configuration
        RETURN *
    """

    # Create the driver session
    with driver.session() as session:
        # Run query
        print("Running label propagation")
        results = session.run(label_prop_mutate_example_graph_query).data()
        # Prettify the first result
        print(json.dumps(results[0], indent=2, sort_keys=True))


# Method to get and pretty-print the algorithm progress
def run_list_progress():
    gds_list_progress_query = """
        CALL gds.beta.listProgress()
        YIELD jobId, taskName, progress, progressBar
        RETURN *
    """

    # Create the driver session
    with driver.session() as session:
        # Run query
        print('running list progress')
        results = session.run(gds_list_progress_query).data()
        # Prettify the first result
        print('list progress results: ')
        print(json.dumps(results[0], indent=2, sort_keys=True))


# Create a thread for the label propagation algorithm and start it
label_prop_query_thread = threading.Thread(target=run_label_prop)
label_prop_query_thread.start()

# Sleep for a few seconds so the label propagation query has time to get going
print('Sleeping for 5 seconds')
time.sleep(5)

# Check the algorithm progress
run_list_progress()

# Sleep for a few more seconds
print('Sleeping for 10 more seconds')
time.sleep(10)

# Check the algorithm progress again
run_list_progress()

# Block and wait for the algorithm thread to finish
label_prop_query_thread.join()

清理

现在可以删除内存中的图。

result = gds.graph.drop(g)

print(result)
CALL gds.graph.drop('example-graph')
delete_example_in_memory_graph_query = """
CALL gds.graph.drop('example-graph')
"""

with driver.session() as session:
    # Run query
    results = session.run(delete_example_in_memory_graph_query).data()

    # Prettify the results
    print(json.dumps(results, indent=2, sort_keys=True, default=default))

关闭连接

当不再需要连接时,应始终关闭它。

尽管 GDS 客户端在对象被删除时会自动关闭连接,但显式关闭它是一个好习惯。

# Close the client connection
gds.close()
# Close the driver connection
driver.close()

参考文献

Cypher

  • 了解有关 Cypher 语法的更多信息

  • 您可以使用 Cypher 速查表 作为所有可用 Cypher 功能的参考