机器学习管道:节点分类

Open In Colab

此 Jupyter Notebook 托管在 Neo4j 图数据科学客户端 GitHub 仓库的此处

此 Notebook 展示了 GDS 机器学习管道与 Python 客户端以及著名的 Cora 数据集的使用方法。

我们在此介绍的任务是图机器学习中的典型用例:给定图和一些节点特征,对节点进行分类。

1. 设置

我们需要一个 Neo4j 和 GDS 可用的专用环境,例如全新的 AuraDS 实例(预装了 GDS)或带有专用数据库的 Neo4j Desktop。

请注意,我们将对 Neo4j 中的数据进行写入和删除操作。

获取访问此环境的凭据后,我们可以安装 `graphdatascience` 包并导入客户端类。

%pip install graphdatascience
import os

from graphdatascience import GraphDataScience

使用本地 Neo4j 设置时,默认连接 URI 为 `bolt://localhost:7687`;而使用 AuraDS 时,连接 URI 略有不同,因为它使用 `neo4j+s` 协议。在这种情况下,客户端还应包含 `aura_ds=True` 标志以启用 AuraDS 推荐设置。有关更多详细信息,请查看 Neo4j GDS 客户端文档

# Get Neo4j DB URI, credentials and name from environment if applicable
NEO4J_URI = os.environ.get("NEO4J_URI", "bolt://localhost:7687")
NEO4J_AUTH = None
NEO4J_DB = os.environ.get("NEO4J_DB", "neo4j")
if os.environ.get("NEO4J_USER") and os.environ.get("NEO4J_PASSWORD"):
    NEO4J_AUTH = (
        os.environ.get("NEO4J_USER"),
        os.environ.get("NEO4J_PASSWORD"),
    )
gds = GraphDataScience(NEO4J_URI, auth=NEO4J_AUTH, database=NEO4J_DB)

# On AuraDS:
#
# gds = GraphDataScience(NEO4J_URI, auth=NEO4J_AUTH, database=NEO4J_DB, aura_ds=True)

我们还需要检查 GDS 库的版本是否为 2.5.0 或更高版本。

from graphdatascience import ServerVersion

assert gds.server_version() >= ServerVersion(2, 5, 0)

最后,我们导入 `json` 以帮助编写用于加载数据的 Cypher 查询,并导入 `numpy` 和 `pandas` 以进行进一步的数据处理。

import json

import numpy as np
import pandas as pd

2. 加载 Cora 数据集

首先,我们需要在 Neo4j 上加载 Cora 数据集。最新版本的 GDS 客户端包含 Cora 数据集作为即用型图(例如,请参阅 PyG 示例 Notebook);或者,图构建 Notebook 展示了如何在不将其写入 Neo4j 的情况下将 Cora 图投影到内存中。无论如何,在本教程中,我们使用 CSV 文件中的数据和一些 Cypher 代码来运行一个端到端示例,从将源数据加载到 Neo4j 到训练模型并将其用于预测。

请注意,如果您在 AuraDS 实例上使用 Cora 图加载器或图构建方法,则无法将数据写入 Neo4j 数据库。

CSV 文件可在以下 URI 找到

CORA_CONTENT = "https://data.neo4j.com/cora/cora.content"
CORA_CITES = "https://data.neo4j.com/cora/cora.cites"

加载后,我们需要执行额外的预处理步骤,将 `subject` 字段(在数据集中为字符串)转换为整数,因为节点属性必须是数值才能投影到图中;尽管我们可以分配连续的 ID,但我们为第一个主题分配了一个非 0 的 ID,以便稍后展示类标签在模型中是如何表示的。

我们还选择了一些节点进行保留,以便在模型训练后进行测试。注意:这与算法的测试/分割比率无关。

SUBJECT_TO_ID = {
    "Neural_Networks": 100,
    "Rule_Learning": 1,
    "Reinforcement_Learning": 2,
    "Probabilistic_Methods": 3,
    "Theory": 4,
    "Genetic_Algorithms": 5,
    "Case_Based": 6,
}

HOLDOUT_NODES = 10

我们现在可以使用 `LOAD CSV` Cypher 语句和一些基本数据转换来加载 CSV 文件

# Define a string representation of the SUBJECT_TO_ID map using backticks
subject_map = json.dumps(SUBJECT_TO_ID).replace('"', "`")

# Cypher command to load the nodes using `LOAD CSV`, taking care of
# converting the string `subject` field into an integer and
# replacing the node label for the holdout nodes
load_nodes = f"""
    LOAD CSV FROM "{CORA_CONTENT}" AS row
    WITH
      {subject_map} AS subject_to_id,
      toInteger(row[0]) AS extId,
      row[1] AS subject,
      toIntegerList(row[2..]) AS features
    MERGE (p:Paper {{extId: extId, subject: subject_to_id[subject], features: features}})
    WITH p LIMIT {HOLDOUT_NODES}
    REMOVE p:Paper
    SET p:UnclassifiedPaper
"""

# Cypher command to load the relationships using `LOAD CSV`
load_relationships = f"""
    LOAD CSV FROM "{CORA_CITES}" AS row
    MATCH (n), (m)
    WHERE n.extId = toInteger(row[0]) AND m.extId = toInteger(row[1])
    MERGE (n)-[:CITES]->(m)
"""

# Load nodes and relationships on Neo4j
gds.run_cypher(load_nodes)
gds.run_cypher(load_relationships)

数据加载到 Neo4j 后,我们现在可以投影一个图,其中包括所有节点和 `CITES` 关系作为无向关系(并使用 `SINGLE` 聚合,以跳过因添加反向而重复的关系)。

# Create the projected graph containing both classified and unclassified nodes
G, _ = gds.graph.project(
    "cora-graph",
    {"Paper": {"properties": ["features", "subject"]}, "UnclassifiedPaper": {"properties": ["features"]}},
    {"CITES": {"orientation": "UNDIRECTED", "aggregation": "SINGLE"}},
)

我们最终可以检查新投影图中的节点和关系数量,以确保其已正确创建

assert G.node_count() == 2708
assert G.relationship_count() == 10556

3. 管道目录基础

数据集加载后,我们可以定义一个节点分类机器学习管道。

# Create the pipeline
node_pipeline, _ = gds.beta.pipeline.nodeClassification.create("cora-pipeline")

我们可以使用 `list` 方法检查管道是否已实际创建

# List all pipelines
gds.pipeline.list()

# Alternatively, get the details of a specific pipeline object
gds.pipeline.list(node_pipeline)

4. 配置管道

我们现在可以配置管道。提醒一下,我们需要

  1. 选择可用节点属性的一个子集作为机器学习模型的特征

  2. 配置训练/测试分割以及 k 折交叉验证的折叠数量 (可选)

  3. 配置训练的候选模型

  4. 配置自动调优 (可选) 在此示例中,我们使用 Logistic Regression 作为训练的候选模型,但其他算法(如 Random Forest)也可用。我们还设置了一些合理的起始参数,可以根据所需的指标进一步调整。

某些超参数(例如 `penalty`)可以是单个值或范围。如果它们以范围表示,则使用自动调优来搜索其最佳值。

可以使用 `configureAutoTuning` 方法设置要尝试的模型候选数量。这里我们选择 5 个以缩短训练时间。

# "Mark" some node properties that will be used as features
node_pipeline.selectFeatures(["features"])

# If needed, change the train/test split ratio and the number of folds
# for k-fold cross-validation
node_pipeline.configureSplit(testFraction=0.2, validationFolds=5)

# Add a model candidate to train
node_pipeline.addLogisticRegression(maxEpochs=200, penalty=(0.0, 0.5))

# Explicit set the number of trials for autotuning (default = 10)
node_pipeline.configureAutoTuning(maxTrials=5)

5. 训练管道

配置好的管道现在可以进行模型选择和训练。我们还会运行一次训练估算,以确保有足够的资源来运行实际的训练。

节点分类模型支持多种评估指标。这里我们使用全局指标 `F1_WEIGHTED`。

注意:为了演示目的,`concurrency` 参数被显式设置为 4(默认值)。对于 Neo4j 社区版,库中的最大并发数限制为 4。

# Estimate the resources needed for training the model
node_pipeline.train_estimate(
    G,
    targetNodeLabels=["Paper"],
    modelName="cora-pipeline-model",
    targetProperty="subject",
    metrics=["F1_WEIGHTED"],
    randomSeed=42,
    concurrency=4,
)
# Perform the actual training
model, stats = node_pipeline.train(
    G,
    targetNodeLabels=["Paper"],
    modelName="cora-pipeline-model",
    targetProperty="subject",
    metrics=["F1_WEIGHTED"],
    randomSeed=42,
    concurrency=4,
)

我们可以检查训练结果,例如打印已训练模型的评估指标。

# Uncomment to print all stats
# print(stats.to_json(indent=2))

# Print F1_WEIGHTED metric
stats["modelInfo"]["metrics"]["F1_WEIGHTED"]["test"]

6. 使用模型进行预测

训练完成后,模型即可对未分类数据进行分类。

使用 `predict` 模式的一种简单方法是直接流式传输预测结果。当图非常大时,这可能不切实际,因此应仅用于实验目的。

predicted = model.predict_stream(
    G, modelName="cora-pipeline-model", includePredictedProbabilities=True, targetNodeLabels=["UnclassifiedPaper"]
)

预测结果是一个 Pandas `DataFrame`,其中包含每个节点的预测类别和所有类别的预测概率。

predicted

`predictedProbabilities` 字段中类别的顺序在模型信息中给出,可用于检索预测类别的预测概率。

请注意,类在 `predictedProbabilities` 字段中出现的顺序是任意的,因此访问每个概率的正确方法是通过从模型中获取的类索引,而不是其位置。

# List of class labels
classes = stats["modelInfo"]["classes"]
print("Class labels:", classes)

# Calculate the confidence percentage for the predicted class
predicted["confidence"] = predicted.apply(
    lambda row: np.floor(row["predictedProbabilities"][classes.index(row["predictedClass"])] * 100), axis=1
)

predicted

7. 添加数据预处理步骤

通过添加更多特征或完全使用不同的特征,模型的质量可能会提高。一种方法是使用 FastRP 等算法,这些算法基于节点属性和图特征创建嵌入,可以通过 `addNodeProperty` 管道方法添加。这些属性是“瞬态”的,因为它们由管道本身自动创建和移除。

在此示例中,我们还使用 `contextNodeLabels` 参数来显式设置我们计算嵌入的节点类型,并且我们包括已分类和未分类的节点。这很有用,因为使用的节点越多,生成的嵌入就越好。尽管这可能看起来与直觉相反,但未分类的节点在训练期间不必完全未被观察(例如,可以保留其邻居的信息)。更多信息可以在图机器学习出版物中找到,例如 《图表示学习》

node_pipeline_fastrp, _ = gds.beta.pipeline.nodeClassification.create("cora-pipeline-fastrp")

# Add a step in the pipeline that mutates the graph
node_pipeline_fastrp.addNodeProperty(
    "fastRP",
    mutateProperty="embedding",
    embeddingDimension=512,
    propertyRatio=1.0,
    randomSeed=42,
    featureProperties=["features"],
    contextNodeLabels=["Paper", "UnclassifiedPaper"],
)

# With the node embeddings available as features, we no longer use the original raw `features`.
node_pipeline_fastrp.selectFeatures(["embedding"])

# Configure the pipeline as before
node_pipeline_fastrp.configureSplit(testFraction=0.2, validationFolds=5)
node_pipeline_fastrp.addLogisticRegression(maxEpochs=200, penalty=(0.0, 0.5))
node_pipeline.configureAutoTuning(maxTrials=5)

然后训练过程与上一节相同

# Perform the actual training
model_fastrp, stats_fastrp = node_pipeline_fastrp.train(
    G,
    targetNodeLabels=["Paper"],
    modelName="cora-pipeline-model-fastrp",
    targetProperty="subject",
    metrics=["F1_WEIGHTED"],
    randomSeed=42,
    concurrency=4,
)

使用嵌入后,`F1_WEIGHTED` 指标表现更好

print(stats_fastrp["modelInfo"]["metrics"]["F1_WEIGHTED"]["test"])

使用 `predict_stream` 的分类可以以相同的方式运行

predicted_fastrp = model_fastrp.predict_stream(
    G,
    modelName="cora-pipeline-model-fastrp",
    includePredictedProbabilities=True,
    targetNodeLabels=["UnclassifiedPaper"],
)
print(len(predicted_fastrp))

与流式传输结果不同,可以在 `mutate` 模式下运行预测以获得更好的性能,尤其是在多次使用预测值时。可以使用 `nodeProperty.stream` 方法和 `UnclassifiedPaper` 类检索预测节点。

model_fastrp.predict_mutate(
    G,
    mutateProperty="predictedClass",
    modelName="cora-pipeline-model-fastrp",
    predictedProbabilityProperty="predictedProbabilities",
    targetNodeLabels=["UnclassifiedPaper"],
)

predicted_fastrp = gds.graph.nodeProperty.stream(G, "predictedClass", ["UnclassifiedPaper"])
predicted_fastrp

这有助于将分类结果与测试节点的原始 `subject` 值进行比较,由于该值已从投影图中排除,因此必须从 Neo4j 数据库中检索。

# Retrieve node information from Neo4j using the node IDs from the prediction result
nodes = gds.util.asNodes(predicted_fastrp.nodeId.to_list())

# Create a new DataFrame containing node IDs along with node properties
nodes_df = pd.DataFrame([(node.id, node["subject"]) for node in nodes], columns=["nodeId", "subject"])

# Merge with the prediction result on node IDs, to check the predicted value
# against the original subject
#
# NOTE: This could also be replaced by just appending `node["subject"]` as a
# Series since the node order would not change, but a proper merge (or join)
# is clearer and less prone to errors.
predicted_fastrp.merge(nodes_df, on="nodeId")

如我们所见,所有测试节点的预测都是准确的。

8. 将结果写回 Neo4j

将预测类别写回图后,我们现在可以将其写回 Neo4j 数据库。

请注意,如果您在 AuraDS 上运行此 Notebook,则此步骤不适用。

gds.graph.nodeProperties.write(
    G,
    node_properties=["predictedClass"],
    node_labels=["UnclassifiedPaper"],
)

9. 清理

当不再需要图、模型和管道时,应将其丢弃以释放内存。只有在 Neo4j 或 AuraDS 实例未重新启动的情况下才需要这样做,因为重新启动会清除所有内存中的内容。

model.drop()
model_fastrp.drop()
node_pipeline.drop()
node_pipeline_fastrp.drop()

G.drop()

如果不再有用,Neo4j 数据库需要显式清理

gds.run_cypher("MATCH (n) WHERE n:Paper OR n:UnclassifiedPaper DETACH DELETE n")

关闭客户端也是一个好习惯

gds.close()
© . All rights reserved.