Pregel API

AuraDS 中没有此功能。

简介

Pregel 是一种以顶点为中心的计算模型,用于通过用户定义的计算函数定义您自己的算法。节点值可以在计算函数中更新,并表示算法结果。输入图包含默认节点值或来自图投影的节点值。

计算函数在多个迭代中执行,也称为超步。在每个超步中,计算函数会针对图中的每个节点运行。在该函数内,一个节点可以接收来自其他节点的消息,通常是其邻居。根据接收到的消息和其当前存储的值,节点可以计算一个新值。节点还可以向其他节点(通常是其邻居)发送消息,这些消息将在下一个超步中接收。算法将在固定数量的超步后终止,或者如果节点之间没有发送任何消息时终止。

Pregel 计算是并行执行的。每个线程为一批节点执行计算函数。

有关 Pregel 的更多信息,请查看 https://kowshik.github.io/JPregel/pregel_paper.pdf

为了实现您自己的 Pregel 算法,图数据科学库提供了一个 Java API,该 API 在下面描述。

引入新的 Pregel 算法可以分为两个主要步骤。首先,我们需要使用 Pregel Java API 实现算法。其次,我们需要通过 Cypher 过程公开算法,以便使用它。

有关如何通过 Neo4j 过程公开自定义 Pregel 计算的示例,请查看 Pregel 示例

Pregel Java API

Pregel Java API 允许我们通过实现几个接口来轻松构建自己的算法。

计算

第一步是实现 org.neo4j.gds.beta.pregel.PregelComputation 接口。它是使用 Pregel 框架表达用户定义逻辑的主要接口。

Pregel 节点值
public interface PregelComputation<C extends PregelConfig> {
    // The schema describes the node property layout.
    PregelSchema schema();
    // Called in the first superstep and allows initializing node state.
    default void init(PregelContext.InitContext<C> context) {}
    // Called in each superstep for each node and contains the main logic.
    void compute(PregelContext.ComputeContext<C> context, Pregel.Messages messages);
    // Called exactly once at the end of each superstep by a single thread.
    default void masterCompute(MasterComputeContext<C> context) {}
    // Used to combine all messages sent to a node to a single value.
    default Optional<Reducer> reducer() {
        return Optional.empty();
    }
    // Used to apply a relationship weight on a message.
    default double applyRelationshipWeight(double message, double relationshipWeight);
    // Used to close any opened resources, such as ThreadLocals
    default void close() {}
}

Pregel 节点值是复合值。schema 描述了该复合值的布局。架构的每个元素都可以表示一个基本 long 或 double 值,以及这些值的数组。该元素由一个键唯一标识,该键用于在计算期间访问该值。有关架构声明的详细信息,请参见 专用部分

init 方法在 Pregel 计算的第一个超步开始时调用,并允许初始化节点值。该接口定义了一个抽象的 compute 方法,该方法在每个超步中为每个节点调用。算法特定的逻辑在 compute 方法中表达。context 参数提供对投影图的节点属性和算法配置的访问。

compute 方法在每个超步中单独为每个节点调用,只要节点接收消息或尚未投票停止。由于 PregelComputation 的实现是无状态的,因此节点只能通过消息与其他节点通信。在每个超步中,节点接收 messages,并且可以通过 context 参数发送新消息。消息可以发送到邻居节点或任何已知其标识符的节点。

masterCompute 方法在每个超级步的最后被调用一次。它由单个线程执行,可用于根据当前计算状态修改全局状态。有关使用主计算的详细信息,请参见 专用部分

可选的 reducer 可用于定义应用于发送到单个节点的消息的函数。它接受两个参数,当前值和消息值,并生成一个新值。该函数被重复调用,每个发送到节点的消息都调用一次。最终,在下一个超级步中,该节点将只收到一条消息。通过定义 reducer,可以显着提高内存消耗和计算运行时间。有关更多详细信息,请查看 专用部分

applyRelationshipWeight 方法可用于根据关系属性修改消息。如果输入图没有关系属性,即未加权,则跳过该方法。

close 方法可用于关闭在实现过程中打开的任何资源。这包括 ThreadLocals、文件句柄、网络连接或任何在算法完成计算后不应保持活动状态的其他内容。

Pregel 架构

在 Pregel 中,每个节点都与一个值相关联,该值可以在 compute 方法中访问。该值通常用于表示中间计算状态,最终表示计算结果。为了表示复杂状态,节点值是一种复合类型,它包含一个或多个命名值。从 compute 函数的角度来看,可以通过其名称访问这些值中的每一个。

在实现 PregelComputation 时,必须重写 schema() 方法。以下示例展示了最简单的示例。

PregelSchema schema() {
    return PregelSchema.Builder().add("foobar", ValueType.LONG).build();
}

节点值包含一个名为 foobar 的值,其类型为 long。节点值可以是任何 GDS 支持的类型,例如 longdoublelong[]double[]float[]

我们可以向架构添加任意数量的值。

PregelSchema schema() {
    return PregelSchema.Builder()
        .add("foobar", ValueType.LONG)
        .add("baz", ValueType.DOUBLE)
        .build();
}

请注意,每个属性在执行算法时都会消耗额外的内存,这通常相当于节点数量乘以单个值的大小(例如,对于 longdouble,为 64 位)。

构建器上的 add 方法接受第三个参数:Visibility。有两个可能的值:PUBLIC(默认)和 PRIVATE。可见性在 过程代码生成 期间被考虑,以指示该值是否为 Pregel 结果的一部分。任何具有 PUBLIC 可见性的值都将成为计算结果的一部分,并包含在过程的结果中,例如,流式传输到调用者、突变到内存中图或写入数据库。

以下示例展示了一种架构,其中一个值用作结果,而第二个值仅在计算过程中使用。

PregelSchema schema() {
    return PregelSchema.Builder()
        .add("result", ValueType.LONG, Visiblity.PUBLIC)
        .add("tempValue", ValueType.DOUBLE, Visiblity.PRIVATE)
        .build();
}

初始化上下文和计算上下文

这两个上下文对象的 主要 目的是使计算能够与 Pregel 框架通信。上下文是有状态的,其所有方法都受当前超级步和当前处理的节点的影响。两个上下文对象共享一组方法,例如,用于访问配置和节点状态。此外,每个上下文都添加了特定于上下文的 方法。

org.neo4j.gds.beta.pregel.PregelContext.InitContext 在 Pregel 计算的 init 方法中可用。它提供对存储在内存中图中的节点属性的访问。我们可以将初始节点状态设置为固定值,例如节点 ID,或者使用图属性和用户定义的配置来初始化依赖于上下文的 状态。

InitContext
public final class InitContext {
    // The currently processed node id.
    public long nodeId();
    // User-defined Pregel configuration
    public PregelConfig config();
    // Sets a double node value for the given schema key.
    public void setNodeValue(String key, double value);
    // Sets a long node value for the given schema key.
    public void setNodeValue(String key, long value);
    // Sets a double array node value for the given schema key.
    public void setNodeValue(String key, double[] value);
    // Sets a long array node value for the given schema key.
    public void setNodeValue(String key, long[] value);
    // Number of nodes in the input graph.
    public long nodeCount();
    // Number of relationships in the input graph.
    public long relationshipCount();
    // Number of relationships of the current node.
    public int degree();
    // Available node property keys in the input graph.
    public Set<String> nodePropertyKeys();
    // Node properties stored in the input graph.
    public NodeProperties nodeProperties(String key);
}

相反,org.neo4j.gds.beta.pregel.PregelContext.ComputeContext 可以在 compute 方法中访问。该上下文提供访问计算状态的方法,例如当前超级步,以及向图中的其他节点发送消息的方法。

ComputeContext
public final class ComputeContext {
    // The currently processed node id.
    public long nodeId();
    // User-defined Pregel configuration
    public PregelConfig config();
    // Sets a double node value for the given schema key.
    public void setNodeValue(String key, double value);
    // Sets a long node value for the given schema key.
    public void setNodeValue(String key, long value);
    // Number of nodes in the input graph.
    public long nodeCount();
    // Number of relationships in the input graph.
    public long relationshipCount();
    // Indicates whether the input graph is a multi-graph.
    public boolean isMultiGraph();
    // Number of relationships of the current node.
    public int degree();
    // Double value for the given node schema key.
    public double doubleNodeValue(String key);
    // Double value for the given node schema key.
    public long longNodeValue(String key);
    // Double array value for the given node schema key.
    public double[] doubleArrayNodeValue(String key);
    // Long array value for the given node schema key.
    public long[] longArrayNodeValue(String key);
    // Notify the framework that the node intends to stop its computation.
    public void voteToHalt();
    // Indicates whether this is superstep 0.
    public boolean isInitialSuperstep();
    // 0-based superstep identifier.
    public int superstep();
    // Sends the given message to all neighbors of the node.
    public void sendToNeighbors(double message);
    // Sends the given message to the target node.
    public void sendTo(long targetNodeId, double message);
    // Stream of neighbor ids of the current node.
    public LongStream getNeighbours();
}

主计算

一些 Pregel 程序可能需要在所有线程完成当前超级步后执行的逻辑,例如,重置或评估全局数据结构。这可以通过重写 org.neo4j.gds.beta.pregel.PregelComputation.masterCompute 函数来实现。此函数将在所有计算线程完成后的每个超级步结束时被调用。主计算函数将由单个线程调用。

masterCompute 函数可以访问 org.neo4j.gds.beta.pregel.PregelContext.MasterComputeContext。该上下文类似于 ComputeContext,但没有绑定到特定节点,也不允许发送消息。此外,MasterComputeContext 允许对图中的每个节点运行一个函数,并访问所有节点的计算状态。

MasterComputeContext
public final class MasterComputeContext {
    // User-defined Pregel configuration
    public PregelConfig config();
    // Number of nodes in the input graph.
    public long nodeCount();
    // Number of relationships in the input graph.
    public long relationshipCount();
    // Indicates whether the input graph is a multi-graph.
    public boolean isMultiGraph();
    // Run the given consumer for every node in the graph.
    public void forEachNode(LongPredicate consumer);
    // Double value for the given node schema key.
    public double doubleNodeValue(long nodeId, String key);
    // Double value for the given node schema key.
    public long longNodeValue(long nodeId, String key);
    // Double array value for the given node schema key.
    public double[] doubleArrayNodeValue(long nodeId, String key);
    // Long array value for the given node schema key.
    public long[] longArrayNodeValue(long nodeId, String key);
    // Sets a double node value for the given schema key.
    public void setNodeValue(long nodeId, String key, double value);
    // Sets a long node value for the given schema key.
    public void setNodeValue(long nodeId, String key, long value);
    // Sets a double array node value for the given schema key.
    public void setNodeValue(long nodeId, String key, double[] value);
    // Sets a long array node value for the given schema key.
    public void setNodeValue(long nodeId, String key, long[] value);
    // Indicates whether this is superstep 0.
    public boolean isInitialSuperstep();
    // 0-based superstep identifier.
    public int superstep();
}

消息 reducer

许多 Pregel 计算依赖于从发送到节点的所有消息中计算单个值。例如,PageRank 算法计算发送到单个节点的所有消息的总和。在这些情况下,可以使用 reducer 将所有消息组合成单个值。如果适用,此优化可以提高内存消耗和计算运行时间。

默认情况下,Pregel 计算不会使用 reducer。发送到节点的所有消息都存储在队列中,并在下一个超级步中接收。为了启用消息缩减,需要实现 reducer 方法并提供自定义或预定义的 reducer。

需要实现的 Reducer 接口。
public interface Reducer {
    // The identity element is used as the initial value.
    double identity();
    // Computes a new value based on the current value and the message.
    double reduce(double current, double message);
}

标识值用作 reduce 函数中 current 参数的初始值。所有后续调用都使用上一次调用的结果作为 current 值。

该框架已经提供了用于计算消息的最小值、最大值、总和和计数的实现。默认实现是 Reducer 接口的一部分,可以按以下方式应用。

在自定义计算中应用总和 reducer。
public class CustomComputation implements PregelComputation<PregelConfig> {

    @Override
    public void compute(PregelContext.ComputeContext<CustomConfig> context, Pregel.Messages messages) {
        // ...
        for (var message : messages) {
            // ...
        }
    }

    @Override
    public Optional<Reducer> reducer() {
        return Optional.of(new Reducer.Sum());
    }
}

不需要调整 compute 方法的实现。如果存在 reducer,则 messages 迭代器包含零个或一个消息。请注意,定义 reducer 将排除使用异步消息传递运行计算。在这种情况下,配置中的 isAsynchronous 标志将被忽略。

配置

为了配置自定义 Pregel 计算的执行,该框架需要一个配置。org.neo4j.gds.beta.pregel.PregelConfig 提供执行计算的最小选项集。配置选项还映射到可以通过自定义过程稍后设置的参数。这等同于 GDS 库中的所有其他算法。

表 1. Pregel 配置
名称 类型 默认值 描述

maxIterations

整数

-

计算将终止后的最大超级步数量。

isAsynchronous

布尔值

false

指示是否可以在同一超级步中发送和接收消息的标志。

partitioning

字符串

"range"

选择输入图的划分,可以是 "range"、"degree" 或 "auto"。

relationshipWeightProperty

字符串

null

用作权重的关系属性的名称。如果未指定,则算法将无权重运行。

concurrency

整数

4

执行 Pregel 计算时使用的并发性。

writeConcurrency

整数

concurrency

将计算结果写入 Neo4j 时使用的并发性。

writeProperty

字符串

"pregel_"

在写入模式下追加到节点架构键的字符串前缀。

mutateProperty

字符串

"pregel_"

在突变模式下追加到节点架构键的字符串前缀。

对于某些算法,我们想要指定其他配置选项。

通常,这些选项是特定于算法的参数,例如阈值。创建自定义配置的另一个原因与计算的初始化阶段相关。如果我们想要根据图属性初始化节点状态,则需要通过其键访问该属性。由于这些键是图的动态属性,因此我们需要将它们提供给计算。我们可以通过在自定义配置中声明一个用于设置该键的选项来实现这一点。

如果用户定义的 Pregel 计算需要自定义选项,则可以通过扩展 PregelConfig 来创建自定义配置。

自定义配置及其在初始化阶段的用法。
@ValueClass
@Configuration
public interface CustomConfig extends PregelConfig {
    // A property key that refers to a seed property.
    String seedProperty();
    // An algorithm specific parameter.
    int minDegree();
}

public class CustomComputation implements PregelComputation<CustomConfig> {

    @Override
    public void init(PregelContext.InitContext<CustomConfig> context) {
        // Use the custom config key to access a graph property.
        var seedProperties = context.nodeProperties(context.config().seedProperty());
        // Init the node state with the graph property for that node.
        context.setNodeValue("state", seedProperties.doubleValue(context.nodeId()));
    }

    @Override
    public void compute(PregelContext.ComputeContext<CustomConfig> context, Pregel.Messages messages) {
        if (context.degree() >= context.config().minDegree()) {
            // ...
        }
    }

    // ...
}

遍历传入的关系

在 Pregel 中实现的一些算法可能需要或从能够访问和向当前上下文节点的所有传入关系发送消息的能力中受益。GDS 支持为关系类型创建反向索引,这使得能够为有向关系类型遍历传入关系。

Pregel 算法可以通过实现 org.neo4j.gds.beta.pregel.BidirectionalPregelComputation 接口而不是 PregelComputation 接口来访问此索引。实现此接口具有以下结果

  • Pregel 框架将确保传递到算法的所有关系都被反向索引。如果没有这样的索引,将抛出错误。

  • initcompute 函数的签名现在分别接受 org.neo4j.gds.beta.pregel.context.InitContext.BidirectionalInitContextorg.neo4j.gds.beta.pregel.context.ComputeContext.BidirectionalComputeContext

  • 使用 @PregelProcedure 注释的算法将自动创建所有必需的反向索引。

BidirectionalInitContextBidirectionalComputeContexts 除了 InitContextComputeContext 定义的方法外,还公开了以下新方法

//Returns the incoming degree (number of relationships) of the currently processed node.
public int incomingDegree();
// Calls the consumer for each incoming neighbor of the currently processed node.
public void forEachIncomingNeighbor(LongConsumer targetConsumer);
// Calls the consumer for each incoming neighbor of the given node.
public void forEachIncomingNeighbor(long nodeId, LongConsumer targetConsumer);
// Calls the consumer once for each incoming neighbor of the currently processed node.
public void forEachDistinctIncomingNeighbor(LongConsumer targetConsumer);
// Calls the consumer once for each incoming neighbor of the given node.
public void forEachDistinctIncomingNeighbor(long nodeId, LongConsumer targetConsumer);

此外,BidirectionalComputeContext 还公开了以下函数

// Sends the given message to all neighbors of the node.
public void sendToIncomingNeighbors(double message);

日志记录

以下方法可用于所有上下文(InitContextComputeContextMasterComputeContext),以将自定义消息注入算法执行的进度日志。

日志方法可在 Pregel 上下文中使用
// All contexts inherit from PregelContext
public abstract class PregelContext<CONFIG extends PregelConfig> {

    // Log a debug message to the Neo4j log.
    public void logDebug(String message) {
        progressTracker.logDebug(message);
    }

    // Log a warning message to the Neo4j log.
    public void logWarning(String message) {
        progressTracker.logWarning(message);
    }

    // Log a info message to the Neo4j log
    public void logMessage(String message) {
        progressTracker.logMessage(message);
    }

}

节点 ID 空间转换

一些算法需要用户提供节点作为输入。例如,最短路径算法需要知道起点和终点节点。在 GDS 中,存在两种节点 ID 空间:原始 ID 空间和内部 ID 空间。原始 ID 空间是内存图投影到的图的节点 ID。通常,这些是 Neo4j 节点标识符。内部 ID 空间表示内存图的节点 ID,并且始终是从 ID 0 开始的连续空间。Pregel 计算使用内部节点 ID 空间,例如,ComputeContext#nodeId() 返回当前处理的节点的内部 ID。为了在原始节点 ID 空间和内部节点 ID 空间之间进行转换,所有上下文类都提供以下方法

可在所有 Pregel 上下文中使用的方法,用于在 ID 空间之间进行转换
// All contexts inherit from PregelContext
public abstract class PregelContext<CONFIG extends PregelConfig> {
    // Maps the given internal node to its original counterpart.
    public long toOriginalNodeId(long internalNodeId);
    // Maps the given original node to its internal counterpart.
    public long toInternalNodeId(long originalNodeId);
}

通过 Cypher 运行 Pregel

为了使自定义 Pregel 计算可通过 Cypher 访问,需要通过过程 API 公开它。GDS 中的 Pregel 框架提供了一种简单的方法,可以为所有默认模式生成过程。

过程生成

为了为计算生成过程,需要用 @org.neo4j.gds.beta.pregel.annotation.PregelProcedure 注解它。此外,自定义计算的配置参数必须是 org.neo4j.gds.beta.pregel.PregelProcedureConfig 的子类型。

使用 @PregelProcedure 注解来配置代码生成。
@PregelProcedure(
    name = "custom.pregel.proc",
    modes = {GDSMode.STREAM, GDSMode.WRITE},
    description = "My custom Pregel algorithm"
)
public class CustomComputation implements PregelComputation<PregelProcedureConfig> {
    // ...
}

该注解为代码生成提供了一些配置选项。

表 2. 配置
名称 类型 默认值 描述

名称

字符串

-

生成的程序名称的前缀。它由模式附加。

模式

列表

[STREAM、WRITE、MUTATE、STATS]

为每个指定的模式生成一个过程。

描述

字符串

""

dbms.listProcedures() 中打印的程序描述。

对于上面的代码段,我们将生成四个过程

  • custom.pregel.proc.stream

  • custom.pregel.proc.stream.estimate

  • custom.pregel.proc.write

  • custom.pregel.proc.write.estimate

请注意,默认情况下,PregelSchema 中指定的所有值都包含在过程结果中。要更改此行为,我们可以更改架构各个部分的可见性。有关更多详细信息,请参阅 专用文档部分

构建和安装 Neo4j 插件

为了在 Neo4j 中通过过程使用 Pregel 算法,我们需要将其打包为 Neo4j 插件。pregel-bootstrap 项目是一个良好的起点。项目中的 build.gradle 文件包含实现 Pregel 算法并生成相应过程所需的所有依赖项。

确保根据您的设置更改 gdsVersionneo4jVersion。GDS 和 Neo4j 是运行时依赖项。因此,需要将 GDS 安装为 Neo4j 服务器上的插件。

要构建项目并创建插件 jar,只需运行

./gradlew shadowJar

您可以在 build/libs 中找到 pregel-bootstrap.jar。该 jar 需要放置在 Neo4j 安装中的 plugins 目录中,与 GDS 插件 jar 放在一起。为了能够在 Cypher 中访问该过程,可能需要将它的命名空间添加到 neo4j.conf 文件中。

neo4j.conf 中启用示例过程
dbms.security.procedures.unrestricted=custom.pregel.proc.*
dbms.security.procedures.allowlist=custom.pregel.proc.*

示例

pregel-examples 模块包含一组 Pregel 算法示例。算法实现演示了 Pregel API 的用法。除了每个示例之外,我们还提供了测试类,这些类可以作为编写自定义算法测试的指南。要进行尝试,我们建议将其中一个算法复制到 pregel-bootstrap 项目中,构建它并在 Neo4j 中设置插件。