机器学习管道:节点分类
1. 设置
我们需要一个专用的环境,其中 Neo4j 和 GDS 可用,例如一个新的 AuraDS 实例(预装了 GDS)或带有专用数据库的 Neo4j 桌面版。
请注意,我们将写入和删除 Neo4j 中的数据。
一旦可以使用访问此环境的凭据,我们就可以安装 graphdatascience
包并导入客户端类。
%pip install graphdatascience
import os
from graphdatascience import GraphDataScience
在使用本地 Neo4j 设置时,默认连接 URI 为 bolt://#:7687
;而在使用 AuraDS 时,连接 URI 略有不同,因为它使用 neo4j+s
协议。在这种情况下,客户端还应包含 aura_ds=True
标志以启用 AuraDS 推荐的设置。有关更多详细信息,请查看Neo4j GDS 客户端文档。
# Get Neo4j DB URI, credentials and name from environment if applicable
NEO4J_URI = os.environ.get("NEO4J_URI", "bolt://#:7687")
NEO4J_AUTH = None
NEO4J_DB = os.environ.get("NEO4J_DB", "neo4j")
if os.environ.get("NEO4J_USER") and os.environ.get("NEO4J_PASSWORD"):
NEO4J_AUTH = (
os.environ.get("NEO4J_USER"),
os.environ.get("NEO4J_PASSWORD"),
)
gds = GraphDataScience(NEO4J_URI, auth=NEO4J_AUTH, database=NEO4J_DB)
# On AuraDS:
#
# gds = GraphDataScience(NEO4J_URI, auth=NEO4J_AUTH, database=NEO4J_DB, aura_ds=True)
我们还需要检查 GDS 库的版本是否为 2.2.0 或更高版本,因为我们将使用在GDS 2.2.0中引入的“上下文”概念。
assert gds.version() >= "2.2.0"
最后,我们导入 json
以帮助编写用于加载数据的 Cypher 查询,以及 numpy
和 pandas
用于进一步的数据处理。
import json
import numpy as np
import pandas as pd
2. 加载 Cora 数据集
首先,我们需要在 Neo4j 上加载 Cora 数据集。最新版本的 GDS 客户端包含 Cora 数据集作为现成的图(例如,请参阅PyG 示例笔记本);或者,图构造笔记本显示了如何在内存中投影 Cora 图而无需将其写入 Neo4j。但是,在本教程中,我们使用 CSV 文件中的数据和一些 Cypher 代码来运行端到端示例,从将源数据加载到 Neo4j 到训练模型并将其用于预测。
请注意,如果您在 AuraDS 实例上使用 Cora 图加载器或图构造方法,则无法将数据写入 Neo4j 数据库。
CSV 文件可以在以下 URI 找到
CORA_CONTENT = "https://data.neo4j.com/cora/cora.content"
CORA_CITES = "https://data.neo4j.com/cora/cora.cites"
加载后,我们需要执行额外的预处理步骤,将 subject
字段(在数据集中为字符串)转换为整数,因为节点属性必须为数字才能投影到图中;虽然我们可以分配连续的 ID,但我们将除 0 之外的 ID 分配给第一个主题,以便稍后显示类标签在模型中是如何表示的。
我们还选择一些节点以在模型训练后将其排除以进行测试。**注意:**这与算法测试/分割比率无关。
SUBJECT_TO_ID = {
"Neural_Networks": 100,
"Rule_Learning": 1,
"Reinforcement_Learning": 2,
"Probabilistic_Methods": 3,
"Theory": 4,
"Genetic_Algorithms": 5,
"Case_Based": 6,
}
HOLDOUT_NODES = 10
现在,我们可以使用 LOAD CSV
Cypher 语句和一些基本数据转换来加载 CSV 文件
# Define a string representation of the SUBJECT_TO_ID map using backticks
subject_map = json.dumps(SUBJECT_TO_ID).replace('"', "`")
# Cypher command to load the nodes using `LOAD CSV`, taking care of
# converting the string `subject` field into an integer and
# replacing the node label for the holdout nodes
load_nodes = f"""
LOAD CSV FROM "{CORA_CONTENT}" AS row
WITH
{subject_map} AS subject_to_id,
toInteger(row[0]) AS extId,
row[1] AS subject,
toIntegerList(row[2..]) AS features
MERGE (p:Paper {{extId: extId, subject: subject_to_id[subject], features: features}})
WITH p LIMIT {HOLDOUT_NODES}
REMOVE p:Paper
SET p:UnclassifiedPaper
"""
# Cypher command to load the relationships using `LOAD CSV`
load_relationships = f"""
LOAD CSV FROM "{CORA_CITES}" AS row
MATCH (n), (m)
WHERE n.extId = toInteger(row[0]) AND m.extId = toInteger(row[1])
MERGE (n)-[:CITES]->(m)
"""
# Load nodes and relationships on Neo4j
gds.run_cypher(load_nodes)
gds.run_cypher(load_relationships)
将数据加载到 Neo4j 后,我们现在可以投影一个包含所有节点和 CITES
关系的图(作为无向图,并使用 SINGLE
聚合,以跳过由于添加反向方向而导致的重复关系)。
# Create the projected graph containing both classified and unclassified nodes
G, _ = gds.graph.project(
"cora-graph",
{"Paper": {"properties": ["features", "subject"]}, "UnclassifiedPaper": {"properties": ["features"]}},
{"CITES": {"orientation": "UNDIRECTED", "aggregation": "SINGLE"}},
)
最后,我们可以检查新投影的图中的节点和关系数量,以确保它已正确创建
assert G.node_count() == 2708
assert G.relationship_count() == 10556
3. 管道目录基础
加载数据集后,我们可以定义节点分类机器学习管道。
# Create the pipeline
node_pipeline, _ = gds.beta.pipeline.nodeClassification.create("cora-pipeline")
我们可以使用 list
方法检查管道是否已实际创建
# List all pipelines
gds.beta.pipeline.list()
# Alternatively, get the details of a specific pipeline object
gds.beta.pipeline.list(node_pipeline)
4. 配置管道
现在,我们可以配置管道。提醒一下,我们需要
-
选择可用节点属性的子集,将其用作机器学习模型的特征
-
配置训练/测试分割以及 k 折交叉验证的折数(可选)
-
配置候选模型以进行训练
-
配置自动调整(可选)在本例中,我们使用逻辑回归作为训练的候选模型,但其他算法(如随机森林)也可用。我们还设置了一些合理的起始参数,可以根据所需的指标进一步调整。
一些超参数(例如 penalty
)可以是单个值或范围。如果它们表示为范围,则使用自动调整来搜索其最佳值。
configureAutoTuning
方法可用于设置要尝试的模型候选数量。这里我们选择 5 以缩短训练时间。
# "Mark" some node properties that will be used as features
node_pipeline.selectFeatures(["features"])
# If needed, change the train/test split ratio and the number of folds
# for k-fold cross-validation
node_pipeline.configureSplit(testFraction=0.2, validationFolds=5)
# Add a model candidate to train
node_pipeline.addLogisticRegression(maxEpochs=200, penalty=(0.0, 0.5))
# Explicit set the number of trials for autotuning (default = 10)
node_pipeline.configureAutoTuning(maxTrials=5)
5. 训练管道
配置后的管道现在可以开始选择和训练模型了。我们还会运行训练估计,以确保有足够的资源在之后运行实际训练。
节点分类模型支持多种评估指标。这里我们使用全局指标 F1_WEIGHTED
。
**注意:**出于演示目的,concurrency
参数显式设置为 4(默认值)。库中的最大并发性对于 Neo4j 社区版限制为 4。
# Estimate the resources needed for training the model
node_pipeline.train_estimate(
G,
targetNodeLabels=["Paper"],
modelName="cora-pipeline-model",
targetProperty="subject",
metrics=["F1_WEIGHTED"],
randomSeed=42,
concurrency=4,
)
# Perform the actual training
model, stats = node_pipeline.train(
G,
targetNodeLabels=["Paper"],
modelName="cora-pipeline-model",
targetProperty="subject",
metrics=["F1_WEIGHTED"],
randomSeed=42,
concurrency=4,
)
我们可以检查训练结果,例如打印已训练模型的评估指标。
# Uncomment to print all stats
# print(stats.to_json(indent=2))
# Print F1_WEIGHTED metric
stats["modelInfo"]["metrics"]["F1_WEIGHTED"]["test"]
6. 使用模型进行预测
训练后,模型就可以对未分类数据进行分类了。
使用 predict
模式的一种简单方法是仅流式传输预测结果。当图非常大时,这可能不切实际,因此应仅用于实验目的。
predicted = model.predict_stream(
G, modelName="cora-pipeline-model", includePredictedProbabilities=True, targetNodeLabels=["UnclassifiedPaper"]
)
预测的结果是一个 Pandas DataFrame
,其中包含每个节点的预测类和所有类的预测概率。
predicted
predictedProbabilities
字段中类的顺序在模型信息中给出,可用于检索预测类的预测概率。
请注意,类在 predictedProbabilities
字段中出现的顺序有些随意,因此访问每个概率的正确方法是通过从模型获得的类索引,而不是其位置。
# List of class labels
classes = stats["modelInfo"]["classes"]
print("Class labels:", classes)
# Calculate the confidence percentage for the predicted class
predicted["confidence"] = predicted.apply(
lambda row: np.floor(row["predictedProbabilities"][classes.index(row["predictedClass"])] * 100), axis=1
)
predicted
7. 添加数据预处理步骤
通过添加更多特征或使用完全不同的特征,模型的质量可能会得到提高。一种方法是使用 FastRP 等算法,这些算法基于节点属性和图特征创建嵌入,可以通过 addNodeProperty
管道方法添加。此类属性是“瞬态”的,这意味着它们会由管道本身自动创建和删除。
在此示例中,我们还使用 contextNodeLabels
参数显式设置我们计算嵌入的节点类型,并且我们包含了已分类和未分类的节点。这样做很有用,因为使用的节点越多,生成的嵌入就越好。尽管这可能看起来违反直觉,但未分类的节点在训练期间并不需要完全未被观察到(例如,可以保留其邻居的信息)。更多信息可以在图机器学习出版物中找到,例如 图表示学习书籍。
node_pipeline_fastrp, _ = gds.beta.pipeline.nodeClassification.create("cora-pipeline-fastrp")
# Add a step in the pipeline that mutates the graph
node_pipeline_fastrp.addNodeProperty(
"fastRP",
mutateProperty="embedding",
embeddingDimension=512,
propertyRatio=1.0,
randomSeed=42,
featureProperties=["features"],
contextNodeLabels=["Paper", "UnclassifiedPaper"],
)
# With the node embeddings available as features, we no longer use the original raw `features`.
node_pipeline_fastrp.selectFeatures(["embedding"])
# Configure the pipeline as before
node_pipeline_fastrp.configureSplit(testFraction=0.2, validationFolds=5)
node_pipeline_fastrp.addLogisticRegression(maxEpochs=200, penalty=(0.0, 0.5))
node_pipeline.configureAutoTuning(maxTrials=5)
然后,训练过程与上一节相同。
# Perform the actual training
model_fastrp, stats_fastrp = node_pipeline_fastrp.train(
G,
targetNodeLabels=["Paper"],
modelName="cora-pipeline-model-fastrp",
targetProperty="subject",
metrics=["F1_WEIGHTED"],
randomSeed=42,
concurrency=4,
)
使用嵌入后,F1_WEIGHTED
指标更好。
print(stats_fastrp["modelInfo"]["metrics"]["F1_WEIGHTED"]["test"])
可以使用相同的方式运行使用 predict_stream
的分类。
predicted_fastrp = model_fastrp.predict_stream(
G,
modelName="cora-pipeline-model-fastrp",
includePredictedProbabilities=True,
targetNodeLabels=["UnclassifiedPaper"],
)
print(len(predicted_fastrp))
预测可以以 mutate
模式运行以提高性能,而不是流式传输结果,尤其是在多次使用预测值时。可以使用 streamNodeProperty
方法和 UnclassifiedPaper
类检索预测的节点。
model_fastrp.predict_mutate(
G,
mutateProperty="predictedClass",
modelName="cora-pipeline-model-fastrp",
predictedProbabilityProperty="predictedProbabilities",
targetNodeLabels=["UnclassifiedPaper"],
)
predicted_fastrp = gds.graph.nodeProperty.stream(G, "predictedClass", ["UnclassifiedPaper"])
predicted_fastrp
这对于将分类结果与测试节点的原始 subject
值进行比较很有用,这些值必须从 Neo4j 数据库中检索,因为它已从投影图中排除。
# Retrieve node information from Neo4j using the node IDs from the prediction result
nodes = gds.util.asNodes(predicted_fastrp.nodeId.to_list())
# Create a new DataFrame containing node IDs along with node properties
nodes_df = pd.DataFrame([(node.id, node["subject"]) for node in nodes], columns=["nodeId", "subject"])
# Merge with the prediction result on node IDs, to check the predicted value
# against the original subject
#
# NOTE: This could also be replaced by just appending `node["subject"]` as a
# Series since the node order would not change, but a proper merge (or join)
# is clearer and less prone to errors.
predicted_fastrp.merge(nodes_df, on="nodeId")
正如我们所看到的,所有测试节点的预测都是准确的。
8. 将结果写回 Neo4j
将预测的类别写回图后,我们现在可以将其写回 Neo4j 数据库。
请注意,如果您在 AuraDS 上运行此笔记本,则此步骤不适用。
gds.graph.nodeProperties.write(
G,
node_properties=["predictedClass"],
node_labels=["UnclassifiedPaper"],
)
9. 清理
当不再需要图、模型和管道时,应将其删除以释放内存。只有在 Neo4j 或 AuraDS 实例未重新启动的情况下才需要执行此操作,因为重新启动会自动清理所有内存内容。
model.drop()
model_fastrp.drop()
node_pipeline.drop()
node_pipeline_fastrp.drop()
G.drop()
如果 Neo4j 数据库不再有用,则需要显式清理。
gds.run_cypher("MATCH (n) WHERE n:Paper OR n:UnclassifiedPaper DETACH DELETE n")
关闭客户端也是一个好习惯。
gds.close()