机器学习管道:节点分类
1. 设置
我们需要一个 Neo4j 和 GDS 可用的专用环境,例如全新的 AuraDS 实例(预装了 GDS)或带有专用数据库的 Neo4j Desktop。
请注意,我们将对 Neo4j 中的数据进行写入和删除操作。
获取访问此环境的凭据后,我们可以安装 `graphdatascience` 包并导入客户端类。
%pip install graphdatascience
import os
from graphdatascience import GraphDataScience
使用本地 Neo4j 设置时,默认连接 URI 为 `bolt://localhost:7687`;而使用 AuraDS 时,连接 URI 略有不同,因为它使用 `neo4j+s` 协议。在这种情况下,客户端还应包含 `aura_ds=True` 标志以启用 AuraDS 推荐设置。有关更多详细信息,请查看 Neo4j GDS 客户端文档。
# Get Neo4j DB URI, credentials and name from environment if applicable
NEO4J_URI = os.environ.get("NEO4J_URI", "bolt://localhost:7687")
NEO4J_AUTH = None
NEO4J_DB = os.environ.get("NEO4J_DB", "neo4j")
if os.environ.get("NEO4J_USER") and os.environ.get("NEO4J_PASSWORD"):
NEO4J_AUTH = (
os.environ.get("NEO4J_USER"),
os.environ.get("NEO4J_PASSWORD"),
)
gds = GraphDataScience(NEO4J_URI, auth=NEO4J_AUTH, database=NEO4J_DB)
# On AuraDS:
#
# gds = GraphDataScience(NEO4J_URI, auth=NEO4J_AUTH, database=NEO4J_DB, aura_ds=True)
我们还需要检查 GDS 库的版本是否为 2.5.0 或更高版本。
from graphdatascience import ServerVersion
assert gds.server_version() >= ServerVersion(2, 5, 0)
最后,我们导入 `json` 以帮助编写用于加载数据的 Cypher 查询,并导入 `numpy` 和 `pandas` 以进行进一步的数据处理。
import json
import numpy as np
import pandas as pd
2. 加载 Cora 数据集
首先,我们需要在 Neo4j 上加载 Cora 数据集。最新版本的 GDS 客户端包含 Cora 数据集作为即用型图(例如,请参阅 PyG 示例 Notebook);或者,图构建 Notebook 展示了如何在不将其写入 Neo4j 的情况下将 Cora 图投影到内存中。无论如何,在本教程中,我们使用 CSV 文件中的数据和一些 Cypher 代码来运行一个端到端示例,从将源数据加载到 Neo4j 到训练模型并将其用于预测。
请注意,如果您在 AuraDS 实例上使用 Cora 图加载器或图构建方法,则无法将数据写入 Neo4j 数据库。
CSV 文件可在以下 URI 找到
CORA_CONTENT = "https://data.neo4j.com/cora/cora.content"
CORA_CITES = "https://data.neo4j.com/cora/cora.cites"
加载后,我们需要执行额外的预处理步骤,将 `subject` 字段(在数据集中为字符串)转换为整数,因为节点属性必须是数值才能投影到图中;尽管我们可以分配连续的 ID,但我们为第一个主题分配了一个非 0 的 ID,以便稍后展示类标签在模型中是如何表示的。
我们还选择了一些节点进行保留,以便在模型训练后进行测试。注意:这与算法的测试/分割比率无关。
SUBJECT_TO_ID = {
"Neural_Networks": 100,
"Rule_Learning": 1,
"Reinforcement_Learning": 2,
"Probabilistic_Methods": 3,
"Theory": 4,
"Genetic_Algorithms": 5,
"Case_Based": 6,
}
HOLDOUT_NODES = 10
我们现在可以使用 `LOAD CSV` Cypher 语句和一些基本数据转换来加载 CSV 文件
# Define a string representation of the SUBJECT_TO_ID map using backticks
subject_map = json.dumps(SUBJECT_TO_ID).replace('"', "`")
# Cypher command to load the nodes using `LOAD CSV`, taking care of
# converting the string `subject` field into an integer and
# replacing the node label for the holdout nodes
load_nodes = f"""
LOAD CSV FROM "{CORA_CONTENT}" AS row
WITH
{subject_map} AS subject_to_id,
toInteger(row[0]) AS extId,
row[1] AS subject,
toIntegerList(row[2..]) AS features
MERGE (p:Paper {{extId: extId, subject: subject_to_id[subject], features: features}})
WITH p LIMIT {HOLDOUT_NODES}
REMOVE p:Paper
SET p:UnclassifiedPaper
"""
# Cypher command to load the relationships using `LOAD CSV`
load_relationships = f"""
LOAD CSV FROM "{CORA_CITES}" AS row
MATCH (n), (m)
WHERE n.extId = toInteger(row[0]) AND m.extId = toInteger(row[1])
MERGE (n)-[:CITES]->(m)
"""
# Load nodes and relationships on Neo4j
gds.run_cypher(load_nodes)
gds.run_cypher(load_relationships)
数据加载到 Neo4j 后,我们现在可以投影一个图,其中包括所有节点和 `CITES` 关系作为无向关系(并使用 `SINGLE` 聚合,以跳过因添加反向而重复的关系)。
# Create the projected graph containing both classified and unclassified nodes
G, _ = gds.graph.project(
"cora-graph",
{"Paper": {"properties": ["features", "subject"]}, "UnclassifiedPaper": {"properties": ["features"]}},
{"CITES": {"orientation": "UNDIRECTED", "aggregation": "SINGLE"}},
)
我们最终可以检查新投影图中的节点和关系数量,以确保其已正确创建
assert G.node_count() == 2708
assert G.relationship_count() == 10556
3. 管道目录基础
数据集加载后,我们可以定义一个节点分类机器学习管道。
# Create the pipeline
node_pipeline, _ = gds.beta.pipeline.nodeClassification.create("cora-pipeline")
我们可以使用 `list` 方法检查管道是否已实际创建
# List all pipelines
gds.pipeline.list()
# Alternatively, get the details of a specific pipeline object
gds.pipeline.list(node_pipeline)
4. 配置管道
我们现在可以配置管道。提醒一下,我们需要
-
选择可用节点属性的一个子集作为机器学习模型的特征
-
配置训练/测试分割以及 k 折交叉验证的折叠数量 (可选)
-
配置训练的候选模型
-
配置自动调优 (可选) 在此示例中,我们使用 Logistic Regression 作为训练的候选模型,但其他算法(如 Random Forest)也可用。我们还设置了一些合理的起始参数,可以根据所需的指标进一步调整。
某些超参数(例如 `penalty`)可以是单个值或范围。如果它们以范围表示,则使用自动调优来搜索其最佳值。
可以使用 `configureAutoTuning` 方法设置要尝试的模型候选数量。这里我们选择 5 个以缩短训练时间。
# "Mark" some node properties that will be used as features
node_pipeline.selectFeatures(["features"])
# If needed, change the train/test split ratio and the number of folds
# for k-fold cross-validation
node_pipeline.configureSplit(testFraction=0.2, validationFolds=5)
# Add a model candidate to train
node_pipeline.addLogisticRegression(maxEpochs=200, penalty=(0.0, 0.5))
# Explicit set the number of trials for autotuning (default = 10)
node_pipeline.configureAutoTuning(maxTrials=5)
5. 训练管道
配置好的管道现在可以进行模型选择和训练。我们还会运行一次训练估算,以确保有足够的资源来运行实际的训练。
节点分类模型支持多种评估指标。这里我们使用全局指标 `F1_WEIGHTED`。
注意:为了演示目的,`concurrency` 参数被显式设置为 4(默认值)。对于 Neo4j 社区版,库中的最大并发数限制为 4。
# Estimate the resources needed for training the model
node_pipeline.train_estimate(
G,
targetNodeLabels=["Paper"],
modelName="cora-pipeline-model",
targetProperty="subject",
metrics=["F1_WEIGHTED"],
randomSeed=42,
concurrency=4,
)
# Perform the actual training
model, stats = node_pipeline.train(
G,
targetNodeLabels=["Paper"],
modelName="cora-pipeline-model",
targetProperty="subject",
metrics=["F1_WEIGHTED"],
randomSeed=42,
concurrency=4,
)
我们可以检查训练结果,例如打印已训练模型的评估指标。
# Uncomment to print all stats
# print(stats.to_json(indent=2))
# Print F1_WEIGHTED metric
stats["modelInfo"]["metrics"]["F1_WEIGHTED"]["test"]
6. 使用模型进行预测
训练完成后,模型即可对未分类数据进行分类。
使用 `predict` 模式的一种简单方法是直接流式传输预测结果。当图非常大时,这可能不切实际,因此应仅用于实验目的。
predicted = model.predict_stream(
G, modelName="cora-pipeline-model", includePredictedProbabilities=True, targetNodeLabels=["UnclassifiedPaper"]
)
预测结果是一个 Pandas `DataFrame`,其中包含每个节点的预测类别和所有类别的预测概率。
predicted
`predictedProbabilities` 字段中类别的顺序在模型信息中给出,可用于检索预测类别的预测概率。
请注意,类在 `predictedProbabilities` 字段中出现的顺序是任意的,因此访问每个概率的正确方法是通过从模型中获取的类索引,而不是其位置。
# List of class labels
classes = stats["modelInfo"]["classes"]
print("Class labels:", classes)
# Calculate the confidence percentage for the predicted class
predicted["confidence"] = predicted.apply(
lambda row: np.floor(row["predictedProbabilities"][classes.index(row["predictedClass"])] * 100), axis=1
)
predicted
7. 添加数据预处理步骤
通过添加更多特征或完全使用不同的特征,模型的质量可能会提高。一种方法是使用 FastRP 等算法,这些算法基于节点属性和图特征创建嵌入,可以通过 `addNodeProperty` 管道方法添加。这些属性是“瞬态”的,因为它们由管道本身自动创建和移除。
在此示例中,我们还使用 `contextNodeLabels` 参数来显式设置我们计算嵌入的节点类型,并且我们包括已分类和未分类的节点。这很有用,因为使用的节点越多,生成的嵌入就越好。尽管这可能看起来与直觉相反,但未分类的节点在训练期间不必完全未被观察(例如,可以保留其邻居的信息)。更多信息可以在图机器学习出版物中找到,例如 《图表示学习》。
node_pipeline_fastrp, _ = gds.beta.pipeline.nodeClassification.create("cora-pipeline-fastrp")
# Add a step in the pipeline that mutates the graph
node_pipeline_fastrp.addNodeProperty(
"fastRP",
mutateProperty="embedding",
embeddingDimension=512,
propertyRatio=1.0,
randomSeed=42,
featureProperties=["features"],
contextNodeLabels=["Paper", "UnclassifiedPaper"],
)
# With the node embeddings available as features, we no longer use the original raw `features`.
node_pipeline_fastrp.selectFeatures(["embedding"])
# Configure the pipeline as before
node_pipeline_fastrp.configureSplit(testFraction=0.2, validationFolds=5)
node_pipeline_fastrp.addLogisticRegression(maxEpochs=200, penalty=(0.0, 0.5))
node_pipeline.configureAutoTuning(maxTrials=5)
然后训练过程与上一节相同
# Perform the actual training
model_fastrp, stats_fastrp = node_pipeline_fastrp.train(
G,
targetNodeLabels=["Paper"],
modelName="cora-pipeline-model-fastrp",
targetProperty="subject",
metrics=["F1_WEIGHTED"],
randomSeed=42,
concurrency=4,
)
使用嵌入后,`F1_WEIGHTED` 指标表现更好
print(stats_fastrp["modelInfo"]["metrics"]["F1_WEIGHTED"]["test"])
使用 `predict_stream` 的分类可以以相同的方式运行
predicted_fastrp = model_fastrp.predict_stream(
G,
modelName="cora-pipeline-model-fastrp",
includePredictedProbabilities=True,
targetNodeLabels=["UnclassifiedPaper"],
)
print(len(predicted_fastrp))
与流式传输结果不同,可以在 `mutate` 模式下运行预测以获得更好的性能,尤其是在多次使用预测值时。可以使用 `nodeProperty.stream` 方法和 `UnclassifiedPaper` 类检索预测节点。
model_fastrp.predict_mutate(
G,
mutateProperty="predictedClass",
modelName="cora-pipeline-model-fastrp",
predictedProbabilityProperty="predictedProbabilities",
targetNodeLabels=["UnclassifiedPaper"],
)
predicted_fastrp = gds.graph.nodeProperty.stream(G, "predictedClass", ["UnclassifiedPaper"])
predicted_fastrp
这有助于将分类结果与测试节点的原始 `subject` 值进行比较,由于该值已从投影图中排除,因此必须从 Neo4j 数据库中检索。
# Retrieve node information from Neo4j using the node IDs from the prediction result
nodes = gds.util.asNodes(predicted_fastrp.nodeId.to_list())
# Create a new DataFrame containing node IDs along with node properties
nodes_df = pd.DataFrame([(node.id, node["subject"]) for node in nodes], columns=["nodeId", "subject"])
# Merge with the prediction result on node IDs, to check the predicted value
# against the original subject
#
# NOTE: This could also be replaced by just appending `node["subject"]` as a
# Series since the node order would not change, but a proper merge (or join)
# is clearer and less prone to errors.
predicted_fastrp.merge(nodes_df, on="nodeId")
如我们所见,所有测试节点的预测都是准确的。
8. 将结果写回 Neo4j
将预测类别写回图后,我们现在可以将其写回 Neo4j 数据库。
请注意,如果您在 AuraDS 上运行此 Notebook,则此步骤不适用。
gds.graph.nodeProperties.write(
G,
node_properties=["predictedClass"],
node_labels=["UnclassifiedPaper"],
)
9. 清理
当不再需要图、模型和管道时,应将其丢弃以释放内存。只有在 Neo4j 或 AuraDS 实例未重新启动的情况下才需要这样做,因为重新启动会清除所有内存中的内容。
model.drop()
model_fastrp.drop()
node_pipeline.drop()
node_pipeline_fastrp.drop()
G.drop()
如果不再有用,Neo4j 数据库需要显式清理
gds.run_cypher("MATCH (n) WHERE n:Paper OR n:UnclassifiedPaper DETACH DELETE n")
关闭客户端也是一个好习惯
gds.close()