监控正在运行的算法的进度
在 |
在大型图上运行算法在计算上可能很昂贵。此示例展示了如何使用 gds.beta.listProgress
过程来监控算法的进度,这既可以了解处理速度,也可以确定计算何时完成。
设置
有关如何开始使用 Python 的更多信息,请参阅 使用 Python 连接 教程。
pip install graphdatascience
# Import the client
from graphdatascience import GraphDataScience
# Replace with the actual URI, username, and password
AURA_CONNECTION_URI = "neo4j+s://xxxxxxxx.databases.neo4j.io"
AURA_USERNAME = "neo4j"
AURA_PASSWORD = ""
# Configure the client with AuraDS-recommended settings
gds = GraphDataScience(
AURA_CONNECTION_URI,
auth=(AURA_USERNAME, AURA_PASSWORD),
aura_ds=True
)
有关如何开始使用 Cypher Shell 的更多信息,请参阅 Neo4j Cypher Shell 教程。
从安装 Cypher shell 的目录运行以下命令。 |
export AURA_CONNECTION_URI="neo4j+s://xxxxxxxx.databases.neo4j.io"
export AURA_USERNAME="neo4j"
export AURA_PASSWORD=""
./cypher-shell -a $AURA_CONNECTION_URI -u $AURA_USERNAME -p $AURA_PASSWORD
有关如何开始使用 Python 的更多信息,请参阅 使用 Python 连接 教程。
pip install neo4j
# Import the driver
from neo4j import GraphDatabase
# Replace with the actual URI, username, and password
AURA_CONNECTION_URI = "neo4j+s://xxxxxxxx.databases.neo4j.io"
AURA_USERNAME = "neo4j"
AURA_PASSWORD = ""
# Instantiate the driver
driver = GraphDatabase.driver(
AURA_CONNECTION_URI,
auth=(AURA_USERNAME, AURA_PASSWORD)
)
# Import to prettify results
import json
# Import for the JSON helper function
from neo4j.time import DateTime
# Helper function for serializing Neo4j DateTime in JSON dumps
def default(o):
if isinstance(o, (DateTime)):
return o.isoformat()
创建示例图
通过 GDS 图生成 算法,可以轻松创建内存中的图。通过指定节点数量、每个节点平均发出的关系数量以及关系分布函数,该算法将创建具有以下形状的图
(:1000000_Nodes)-[:REL]→(:1000000_Nodes)
# Run the graph generation algorithm and retrieve the corresponding
# graph object and call result metadata
g, result = gds.beta.graph.generate(
"example-graph",
1000000,
3,
relationshipDistribution="POWER_LAW"
)
# Print prettified graph stats
print(result)
CALL gds.beta.graph.generate(
'example-graph',
1000000,
3,
{relationshipDistribution: 'POWER_LAW'}
)
YIELD name,
nodes,
relationships,
generateMillis,
relationshipSeed,
averageDegree,
relationshipDistribution,
relationshipProperty
RETURN *
# Cypher query
create_example_graph_query = """
CALL gds.beta.graph.generate(
'example-graph',
1000000,
3,
{relationshipDistribution: 'POWER_LAW'}
)
YIELD name,
nodes,
relationships,
generateMillis,
relationshipSeed,
averageDegree,
relationshipDistribution,
relationshipProperty
RETURN *
"""
# Create the driver session
with driver.session() as session:
# Run query
result = session.run(create_example_graph_query).data()
# Prettify the result
print(json.dumps(result, indent=2, sort_keys=True, default=default))
运行算法并检查进度
我们需要运行一个需要一些时间才能收敛的算法。在本示例中,我们使用标签传播算法,该算法在单独的线程中启动,以便我们可以在同一 Python 进程中检查其进度。
# Import to run the long-running algorithm in a thread
import threading
# Import to use the sleep method
import time
# Method to call the label propagation algorithm from a thread
def run_label_prop():
print("Running label propagation")
result = gds.labelPropagation.mutate(
g,
mutateProperty="communityID"
)
print(result)
# Method to get and pretty-print the algorithm progress
def run_list_progress():
result = gds.beta.listProgress()
print(result)
# Create a thread for the label propagation algorithm and start it
label_prop_query_thread = threading.Thread(target=run_label_prop)
label_prop_query_thread.start()
# Sleep for a few seconds so the label propagation query has time to get going
print('Sleeping for 5 seconds')
time.sleep(5)
# Check the algorithm progress
run_list_progress()
# Sleep for a few more seconds
print('Sleeping for 10 more seconds')
time.sleep(10)
# Check the algorithm progress again
run_list_progress()
# Block and wait for the algorithm thread to finish
label_prop_query_thread.join()
CALL gds.labelPropagation.mutate(
'example-graph',
{mutateProperty: 'communityID'}
)
YIELD preProcessingMillis,
computeMillis,
mutateMillis,
postProcessingMillis,
nodePropertiesWritten,
communityCount,
ranIterations,
didConverge,
communityDistribution,
configuration
RETURN *
// The following query has to be run in another Cypher shell, so run this command
// in a different terminal first:
//
// ./cypher-shell -a $AURA_CONNECTION_URI -u $AURA_USERNAME -p $AURA_PASSWORD
CALL gds.beta.listProgress()
YIELD jobId, taskName, progress, progressBar
RETURN *
# Import to run the long-running algorithm in a thread
import threading
# Import to use the sleep method
import time
# Method to call the label propagation algorithm from a thread
def run_label_prop():
label_prop_mutate_example_graph_query = """
CALL gds.labelPropagation.mutate(
'example-graph',
{mutateProperty: 'communityID'}
)
YIELD preProcessingMillis,
computeMillis,
mutateMillis,
postProcessingMillis,
nodePropertiesWritten,
communityCount,
ranIterations,
didConverge,
communityDistribution,
configuration
RETURN *
"""
# Create the driver session
with driver.session() as session:
# Run query
print("Running label propagation")
results = session.run(label_prop_mutate_example_graph_query).data()
# Prettify the first result
print(json.dumps(results[0], indent=2, sort_keys=True))
# Method to get and pretty-print the algorithm progress
def run_list_progress():
gds_list_progress_query = """
CALL gds.beta.listProgress()
YIELD jobId, taskName, progress, progressBar
RETURN *
"""
# Create the driver session
with driver.session() as session:
# Run query
print('running list progress')
results = session.run(gds_list_progress_query).data()
# Prettify the first result
print('list progress results: ')
print(json.dumps(results[0], indent=2, sort_keys=True))
# Create a thread for the label propagation algorithm and start it
label_prop_query_thread = threading.Thread(target=run_label_prop)
label_prop_query_thread.start()
# Sleep for a few seconds so the label propagation query has time to get going
print('Sleeping for 5 seconds')
time.sleep(5)
# Check the algorithm progress
run_list_progress()
# Sleep for a few more seconds
print('Sleeping for 10 more seconds')
time.sleep(10)
# Check the algorithm progress again
run_list_progress()
# Block and wait for the algorithm thread to finish
label_prop_query_thread.join()
清理
现在可以删除内存中的图。
result = gds.graph.drop(g)
print(result)
CALL gds.graph.drop('example-graph')
delete_example_in_memory_graph_query = """
CALL gds.graph.drop('example-graph')
"""
with driver.session() as session:
# Run query
results = session.run(delete_example_in_memory_graph_query).data()
# Prettify the results
print(json.dumps(results, indent=2, sort_keys=True, default=default))
参考文献
Cypher
-
了解有关 Cypher 语法的更多信息
-
您可以使用 Cypher 速查表 作为所有可用 Cypher 功能的参考