机器学习管道:节点分类

Open In Colab

此 Jupyter 笔记本托管在 Neo4j 图数据科学客户端 Github 存储库中的此处

该笔记本演示了如何使用 Python 客户端和著名的Cora 数据集使用 GDS 机器学习管道。

我们在此处介绍的任务是图机器学习中的一个典型用例:给定图和一些节点特征,对节点进行分类。

1. 设置

我们需要一个专用的环境,其中 Neo4j 和 GDS 可用,例如一个新的 AuraDS 实例(预装了 GDS)或带有专用数据库的 Neo4j 桌面版。

请注意,我们将写入和删除 Neo4j 中的数据。

一旦可以使用访问此环境的凭据,我们就可以安装 graphdatascience 包并导入客户端类。

%pip install graphdatascience
import os
from graphdatascience import GraphDataScience

在使用本地 Neo4j 设置时,默认连接 URI 为 bolt://#:7687;而在使用 AuraDS 时,连接 URI 略有不同,因为它使用 neo4j+s 协议。在这种情况下,客户端还应包含 aura_ds=True 标志以启用 AuraDS 推荐的设置。有关更多详细信息,请查看Neo4j GDS 客户端文档

# Get Neo4j DB URI, credentials and name from environment if applicable
NEO4J_URI = os.environ.get("NEO4J_URI", "bolt://#:7687")
NEO4J_AUTH = None
NEO4J_DB = os.environ.get("NEO4J_DB", "neo4j")
if os.environ.get("NEO4J_USER") and os.environ.get("NEO4J_PASSWORD"):
    NEO4J_AUTH = (
        os.environ.get("NEO4J_USER"),
        os.environ.get("NEO4J_PASSWORD"),
    )
gds = GraphDataScience(NEO4J_URI, auth=NEO4J_AUTH, database=NEO4J_DB)

# On AuraDS:
#
# gds = GraphDataScience(NEO4J_URI, auth=NEO4J_AUTH, database=NEO4J_DB, aura_ds=True)

我们还需要检查 GDS 库的版本是否为 2.2.0 或更高版本,因为我们将使用在GDS 2.2.0中引入的“上下文”概念。

assert gds.version() >= "2.2.0"

最后,我们导入 json 以帮助编写用于加载数据的 Cypher 查询,以及 numpypandas 用于进一步的数据处理。

import json

import numpy as np
import pandas as pd

2. 加载 Cora 数据集

首先,我们需要在 Neo4j 上加载 Cora 数据集。最新版本的 GDS 客户端包含 Cora 数据集作为现成的图(例如,请参阅PyG 示例笔记本);或者,图构造笔记本显示了如何在内存中投影 Cora 图而无需将其写入 Neo4j。但是,在本教程中,我们使用 CSV 文件中的数据和一些 Cypher 代码来运行端到端示例,从将源数据加载到 Neo4j 到训练模型并将其用于预测。

请注意,如果您在 AuraDS 实例上使用 Cora 图加载器或图构造方法,则无法将数据写入 Neo4j 数据库。

CSV 文件可以在以下 URI 找到

CORA_CONTENT = "https://data.neo4j.com/cora/cora.content"
CORA_CITES = "https://data.neo4j.com/cora/cora.cites"

加载后,我们需要执行额外的预处理步骤,将 subject 字段(在数据集中为字符串)转换为整数,因为节点属性必须为数字才能投影到图中;虽然我们可以分配连续的 ID,但我们将除 0 之外的 ID 分配给第一个主题,以便稍后显示类标签在模型中是如何表示的。

我们还选择一些节点以在模型训练后将其排除以进行测试。**注意:**这与算法测试/分割比率无关。

SUBJECT_TO_ID = {
    "Neural_Networks": 100,
    "Rule_Learning": 1,
    "Reinforcement_Learning": 2,
    "Probabilistic_Methods": 3,
    "Theory": 4,
    "Genetic_Algorithms": 5,
    "Case_Based": 6,
}

HOLDOUT_NODES = 10

现在,我们可以使用 LOAD CSV Cypher 语句和一些基本数据转换来加载 CSV 文件

# Define a string representation of the SUBJECT_TO_ID map using backticks
subject_map = json.dumps(SUBJECT_TO_ID).replace('"', "`")

# Cypher command to load the nodes using `LOAD CSV`, taking care of
# converting the string `subject` field into an integer and
# replacing the node label for the holdout nodes
load_nodes = f"""
    LOAD CSV FROM "{CORA_CONTENT}" AS row
    WITH
      {subject_map} AS subject_to_id,
      toInteger(row[0]) AS extId,
      row[1] AS subject,
      toIntegerList(row[2..]) AS features
    MERGE (p:Paper {{extId: extId, subject: subject_to_id[subject], features: features}})
    WITH p LIMIT {HOLDOUT_NODES}
    REMOVE p:Paper
    SET p:UnclassifiedPaper
"""

# Cypher command to load the relationships using `LOAD CSV`
load_relationships = f"""
    LOAD CSV FROM "{CORA_CITES}" AS row
    MATCH (n), (m)
    WHERE n.extId = toInteger(row[0]) AND m.extId = toInteger(row[1])
    MERGE (n)-[:CITES]->(m)
"""

# Load nodes and relationships on Neo4j
gds.run_cypher(load_nodes)
gds.run_cypher(load_relationships)

将数据加载到 Neo4j 后,我们现在可以投影一个包含所有节点和 CITES 关系的图(作为无向图,并使用 SINGLE 聚合,以跳过由于添加反向方向而导致的重复关系)。

# Create the projected graph containing both classified and unclassified nodes
G, _ = gds.graph.project(
    "cora-graph",
    {"Paper": {"properties": ["features", "subject"]}, "UnclassifiedPaper": {"properties": ["features"]}},
    {"CITES": {"orientation": "UNDIRECTED", "aggregation": "SINGLE"}},
)

最后,我们可以检查新投影的图中的节点和关系数量,以确保它已正确创建

assert G.node_count() == 2708
assert G.relationship_count() == 10556

3. 管道目录基础

加载数据集后,我们可以定义节点分类机器学习管道。

# Create the pipeline
node_pipeline, _ = gds.beta.pipeline.nodeClassification.create("cora-pipeline")

我们可以使用 list 方法检查管道是否已实际创建

# List all pipelines
gds.beta.pipeline.list()

# Alternatively, get the details of a specific pipeline object
gds.beta.pipeline.list(node_pipeline)

4. 配置管道

现在,我们可以配置管道。提醒一下,我们需要

  1. 选择可用节点属性的子集,将其用作机器学习模型的特征

  2. 配置训练/测试分割以及 k 折交叉验证的折数(可选)

  3. 配置候选模型以进行训练

  4. 配置自动调整(可选)在本例中,我们使用逻辑回归作为训练的候选模型,但其他算法(如随机森林)也可用。我们还设置了一些合理的起始参数,可以根据所需的指标进一步调整。

一些超参数(例如 penalty)可以是单个值或范围。如果它们表示为范围,则使用自动调整来搜索其最佳值。

configureAutoTuning 方法可用于设置要尝试的模型候选数量。这里我们选择 5 以缩短训练时间。

# "Mark" some node properties that will be used as features
node_pipeline.selectFeatures(["features"])

# If needed, change the train/test split ratio and the number of folds
# for k-fold cross-validation
node_pipeline.configureSplit(testFraction=0.2, validationFolds=5)

# Add a model candidate to train
node_pipeline.addLogisticRegression(maxEpochs=200, penalty=(0.0, 0.5))

# Explicit set the number of trials for autotuning (default = 10)
node_pipeline.configureAutoTuning(maxTrials=5)

5. 训练管道

配置后的管道现在可以开始选择和训练模型了。我们还会运行训练估计,以确保有足够的资源在之后运行实际训练。

节点分类模型支持多种评估指标。这里我们使用全局指标 F1_WEIGHTED

**注意:**出于演示目的,concurrency 参数显式设置为 4(默认值)。库中的最大并发性对于 Neo4j 社区版限制为 4。

# Estimate the resources needed for training the model
node_pipeline.train_estimate(
    G,
    targetNodeLabels=["Paper"],
    modelName="cora-pipeline-model",
    targetProperty="subject",
    metrics=["F1_WEIGHTED"],
    randomSeed=42,
    concurrency=4,
)
# Perform the actual training
model, stats = node_pipeline.train(
    G,
    targetNodeLabels=["Paper"],
    modelName="cora-pipeline-model",
    targetProperty="subject",
    metrics=["F1_WEIGHTED"],
    randomSeed=42,
    concurrency=4,
)

我们可以检查训练结果,例如打印已训练模型的评估指标。

# Uncomment to print all stats
# print(stats.to_json(indent=2))

# Print F1_WEIGHTED metric
stats["modelInfo"]["metrics"]["F1_WEIGHTED"]["test"]

6. 使用模型进行预测

训练后,模型就可以对未分类数据进行分类了。

使用 predict 模式的一种简单方法是仅流式传输预测结果。当图非常大时,这可能不切实际,因此应仅用于实验目的。

predicted = model.predict_stream(
    G, modelName="cora-pipeline-model", includePredictedProbabilities=True, targetNodeLabels=["UnclassifiedPaper"]
)

预测的结果是一个 Pandas DataFrame,其中包含每个节点的预测类和所有类的预测概率。

predicted

predictedProbabilities 字段中类的顺序在模型信息中给出,可用于检索预测类的预测概率。

请注意,类在 predictedProbabilities 字段中出现的顺序有些随意,因此访问每个概率的正确方法是通过从模型获得的类索引,而不是其位置。

# List of class labels
classes = stats["modelInfo"]["classes"]
print("Class labels:", classes)

# Calculate the confidence percentage for the predicted class
predicted["confidence"] = predicted.apply(
    lambda row: np.floor(row["predictedProbabilities"][classes.index(row["predictedClass"])] * 100), axis=1
)

predicted

7. 添加数据预处理步骤

通过添加更多特征或使用完全不同的特征,模型的质量可能会得到提高。一种方法是使用 FastRP 等算法,这些算法基于节点属性和图特征创建嵌入,可以通过 addNodeProperty 管道方法添加。此类属性是“瞬态”的,这意味着它们会由管道本身自动创建和删除。

在此示例中,我们还使用 contextNodeLabels 参数显式设置我们计算嵌入的节点类型,并且我们包含了已分类和未分类的节点。这样做很有用,因为使用的节点越多,生成的嵌入就越好。尽管这可能看起来违反直觉,但未分类的节点在训练期间并不需要完全未被观察到(例如,可以保留其邻居的信息)。更多信息可以在图机器学习出版物中找到,例如 图表示学习书籍

node_pipeline_fastrp, _ = gds.beta.pipeline.nodeClassification.create("cora-pipeline-fastrp")

# Add a step in the pipeline that mutates the graph
node_pipeline_fastrp.addNodeProperty(
    "fastRP",
    mutateProperty="embedding",
    embeddingDimension=512,
    propertyRatio=1.0,
    randomSeed=42,
    featureProperties=["features"],
    contextNodeLabels=["Paper", "UnclassifiedPaper"],
)

# With the node embeddings available as features, we no longer use the original raw `features`.
node_pipeline_fastrp.selectFeatures(["embedding"])

# Configure the pipeline as before
node_pipeline_fastrp.configureSplit(testFraction=0.2, validationFolds=5)
node_pipeline_fastrp.addLogisticRegression(maxEpochs=200, penalty=(0.0, 0.5))
node_pipeline.configureAutoTuning(maxTrials=5)

然后,训练过程与上一节相同。

# Perform the actual training
model_fastrp, stats_fastrp = node_pipeline_fastrp.train(
    G,
    targetNodeLabels=["Paper"],
    modelName="cora-pipeline-model-fastrp",
    targetProperty="subject",
    metrics=["F1_WEIGHTED"],
    randomSeed=42,
    concurrency=4,
)

使用嵌入后,F1_WEIGHTED 指标更好。

print(stats_fastrp["modelInfo"]["metrics"]["F1_WEIGHTED"]["test"])

可以使用相同的方式运行使用 predict_stream 的分类。

predicted_fastrp = model_fastrp.predict_stream(
    G,
    modelName="cora-pipeline-model-fastrp",
    includePredictedProbabilities=True,
    targetNodeLabels=["UnclassifiedPaper"],
)
print(len(predicted_fastrp))

预测可以以 mutate 模式运行以提高性能,而不是流式传输结果,尤其是在多次使用预测值时。可以使用 streamNodeProperty 方法和 UnclassifiedPaper 类检索预测的节点。

model_fastrp.predict_mutate(
    G,
    mutateProperty="predictedClass",
    modelName="cora-pipeline-model-fastrp",
    predictedProbabilityProperty="predictedProbabilities",
    targetNodeLabels=["UnclassifiedPaper"],
)

predicted_fastrp = gds.graph.nodeProperty.stream(G, "predictedClass", ["UnclassifiedPaper"])
predicted_fastrp

这对于将分类结果与测试节点的原始 subject 值进行比较很有用,这些值必须从 Neo4j 数据库中检索,因为它已从投影图中排除。

# Retrieve node information from Neo4j using the node IDs from the prediction result
nodes = gds.util.asNodes(predicted_fastrp.nodeId.to_list())

# Create a new DataFrame containing node IDs along with node properties
nodes_df = pd.DataFrame([(node.id, node["subject"]) for node in nodes], columns=["nodeId", "subject"])

# Merge with the prediction result on node IDs, to check the predicted value
# against the original subject
#
# NOTE: This could also be replaced by just appending `node["subject"]` as a
# Series since the node order would not change, but a proper merge (or join)
# is clearer and less prone to errors.
predicted_fastrp.merge(nodes_df, on="nodeId")

正如我们所看到的,所有测试节点的预测都是准确的。

8. 将结果写回 Neo4j

将预测的类别写回图后,我们现在可以将其写回 Neo4j 数据库。

请注意,如果您在 AuraDS 上运行此笔记本,则此步骤不适用。

gds.graph.nodeProperties.write(
    G,
    node_properties=["predictedClass"],
    node_labels=["UnclassifiedPaper"],
)

9. 清理

当不再需要图、模型和管道时,应将其删除以释放内存。只有在 Neo4j 或 AuraDS 实例未重新启动的情况下才需要执行此操作,因为重新启动会自动清理所有内存内容。

model.drop()
model_fastrp.drop()
node_pipeline.drop()
node_pipeline_fastrp.drop()

G.drop()

如果 Neo4j 数据库不再有用,则需要显式清理。

gds.run_cypher("MATCH (n) WHERE n:Paper OR n:UnclassifiedPaper DETACH DELETE n")

关闭客户端也是一个好习惯。

gds.close()