Pregel API

此功能在 AuraDS 中不可用。

简介

Pregel 是一种以顶点为中心的计算模型,允许通过用户定义的 *compute* 函数来定义自己的算法。节点值可以在 compute 函数中更新,并表示算法结果。输入图包含默认节点值或来自图投影的节点值。

compute 函数在多次迭代中执行,这些迭代也称为 *超级步骤*。在每个超级步骤中,compute 函数会为图中的每个节点运行。在该函数中,节点可以从其他节点(通常是其邻居)接收消息。根据接收到的消息及其当前存储的值,节点可以计算出新值。节点还可以向其他节点(通常是其邻居)发送消息,这些消息将在下一个超级步骤中接收。算法在固定数量的超级步骤后终止,或者如果节点之间没有消息发送,则终止。

Pregel 计算是并行执行的。每个线程为一批节点执行 compute 函数。

有关 Pregel 的更多信息,请参阅 https://kowshik.github.io/JPregel/pregel_paper.pdf

要实现您自己的 Pregel 算法,图数据科学库提供了一个 Java API,如下所述。

引入新的 Pregel 算法可以分为两个主要步骤。首先,我们需要使用 Pregel Java API 实现算法。其次,我们需要通过 Cypher 过程公开算法以使用它。

有关如何通过 Neo4j 过程公开自定义 Pregel 计算的示例,请参阅 Pregel 示例

Pregel Java API

Pregel Java API 允许我们通过实现多个接口轻松构建自己的算法。

计算

第一步是实现 org.neo4j.gds.beta.pregel.PregelComputation 接口。它是使用 Pregel 框架表达用户定义逻辑的主要接口。

Pregel 计算
public interface PregelComputation<C extends PregelConfig> {
    // The schema describes the node property layout.
    PregelSchema schema();
    // Called in the first superstep and allows initializing node state.
    default void init(PregelContext.InitContext<C> context) {}
    // Called in each superstep for each node and contains the main logic.
    void compute(PregelContext.ComputeContext<C> context, Pregel.Messages messages);
    // Called exactly once at the end of each superstep by a single thread.
    default void masterCompute(MasterComputeContext<C> context) {}
    // Used to combine all messages sent to a node to a single value.
    default Optional<Reducer> reducer() {
        return Optional.empty();
    }
    // Used to apply a relationship weight on a message.
    default double applyRelationshipWeight(double message, double relationshipWeight);
    // Used to close any opened resources, such as ThreadLocals
    default void close() {}
}

Pregel 节点值是复合值。schema 描述了该复合值的布局。模式的每个元素可以表示原始的 long 或 double 值,以及这些值的数组。该元素由一个键唯一标识,该键用于在计算期间访问值。有关模式声明的详细信息,请参阅专用章节

init 方法在 Pregel 计算的第一个超级步骤开始时调用,并允许初始化节点值。该接口定义了一个抽象的 compute 方法,该方法在每个超级步骤中为每个节点调用。算法特定的逻辑在 compute 方法中表达。context 参数提供对投影图的节点属性和算法配置的访问。

只要节点接收到消息或尚未投票停止,compute 方法就会在每个超级步骤中单独为每个节点调用。由于 PregelComputation 的实现是无状态的,节点只能通过消息与其他节点通信。在每个超级步骤中,节点接收 messages 并可以通过 context 参数发送新消息。如果知道标识符,消息可以发送到邻居节点或任何节点。

masterCompute 方法在每个超级步骤结束时精确调用一次。它由单个线程执行,可用于根据当前计算状态修改全局状态。有关使用主计算的详细信息,请参阅专用章节

可选的 reducer 可用于定义一个应用于发送到单个节点的消息的函数。它接受两个参数:当前值和消息值,并生成一个新值。该函数会重复调用,对于发送到节点的每条消息调用一次。最终,在下一个超级步骤中,节点将只收到一条消息。通过定义 reducer,可以显著提高内存消耗和计算运行时。有关更多详细信息,请查看专用章节

applyRelationshipWeight 方法可用于根据关系属性修改消息。如果输入图没有关系属性(即未加权),则跳过此方法。

close 方法可用于关闭作为实现一部分打开的任何资源。这包括 ThreadLocals、文件句柄、网络连接或算法计算完成后不应保留的任何其他内容。

Pregel 模式

在 Pregel 中,每个节点都关联一个值,该值可以从 compute 方法中访问。该值通常用于表示中间计算状态,并最终表示计算结果。为了表示复杂状态,节点值是一个复合类型,由一个或多个命名值组成。从 compute 函数的角度来看,这些值中的每一个都可以通过其名称访问。

在实现 PregelComputation 时,必须重写 schema() 方法。以下示例展示了最简单的示例

PregelSchema schema() {
    return PregelSchema.Builder().add("foobar", ValueType.LONG).build();
}

节点值由一个名为 foobar 的单个值组成,其类型为 long。节点值可以是 GDS 支持的任何类型,即 longdoublelong[]double[]float[]

我们可以向模式添加任意数量的值

PregelSchema schema() {
    return PregelSchema.Builder()
        .add("foobar", ValueType.LONG)
        .add("baz", ValueType.DOUBLE)
        .build();
}

请注意,每个属性在执行算法时都会消耗额外的内存,这通常相当于节点数乘以单个值的大小(例如,longdouble 为 64 位)。

构建器上的 add 方法接受第三个参数:Visibility。有两个可能的值:PUBLIC(默认)和 PRIVATE。在过程代码生成期间会考虑可见性,以指示该值是否为 Pregel 结果的一部分。任何具有 PUBLIC 可见性的值都将成为计算结果的一部分,并包含在过程结果中,例如,流式传输给调用者、变异到内存图或写入数据库。

以下显示了一个模式,其中一个值用作结果,第二个值仅在计算期间使用

PregelSchema schema() {
    return PregelSchema.Builder()
        .add("result", ValueType.LONG, Visiblity.PUBLIC)
        .add("tempValue", ValueType.DOUBLE, Visiblity.PRIVATE)
        .build();
}

初始化上下文和计算上下文

这两个上下文对象的主要目的是使计算能够与 Pregel 框架通信。上下文是有状态的,其所有方法都受当前超级步骤和当前处理的节点的影响。两个上下文对象共享一组方法,例如,访问配置和节点状态。此外,每个上下文还添加了上下文特定的方法。

org.neo4j.gds.beta.pregel.PregelContext.InitContext 在 Pregel 计算的 init 方法中可用。它提供对存储在内存图中的节点属性的访问。我们可以将初始节点状态设置为固定值,例如节点 ID,或者使用图属性和用户定义的配置来初始化上下文相关的状态。

初始化上下文
public final class InitContext {
    // The currently processed node id.
    public long nodeId();
    // User-defined Pregel configuration
    public PregelConfig config();
    // Sets a double node value for the given schema key.
    public void setNodeValue(String key, double value);
    // Sets a long node value for the given schema key.
    public void setNodeValue(String key, long value);
    // Sets a double array node value for the given schema key.
    public void setNodeValue(String key, double[] value);
    // Sets a long array node value for the given schema key.
    public void setNodeValue(String key, long[] value);
    // Number of nodes in the input graph.
    public long nodeCount();
    // Number of relationships in the input graph.
    public long relationshipCount();
    // Number of relationships of the current node.
    public int degree();
    // Available node property keys in the input graph.
    public Set<String> nodePropertyKeys();
    // Node properties stored in the input graph.
    public NodeProperties nodeProperties(String key);
}

相反,org.neo4j.gds.beta.pregel.PregelContext.ComputeContext 可以在 compute 方法内部访问。该上下文提供了访问计算状态(例如当前超级步骤)以及向图中其他节点发送消息的方法。

计算上下文
public final class ComputeContext {
    // The currently processed node id.
    public long nodeId();
    // User-defined Pregel configuration
    public PregelConfig config();
    // Sets a double node value for the given schema key.
    public void setNodeValue(String key, double value);
    // Sets a long node value for the given schema key.
    public void setNodeValue(String key, long value);
    // Number of nodes in the input graph.
    public long nodeCount();
    // Number of relationships in the input graph.
    public long relationshipCount();
    // Indicates whether the input graph is a multi-graph.
    public boolean isMultiGraph();
    // Number of relationships of the current node.
    public int degree();
    // Double value for the given node schema key.
    public double doubleNodeValue(String key);
    // Double value for the given node schema key.
    public long longNodeValue(String key);
    // Double array value for the given node schema key.
    public double[] doubleArrayNodeValue(String key);
    // Long array value for the given node schema key.
    public long[] longArrayNodeValue(String key);
    // Notify the framework that the node intends to stop its computation.
    public void voteToHalt();
    // Indicates whether this is superstep 0.
    public boolean isInitialSuperstep();
    // 0-based superstep identifier.
    public int superstep();
    // Sends the given message to all neighbors of the node.
    public void sendToNeighbors(double message);
    // Sends the given message to the target node.
    public void sendTo(long targetNodeId, double message);
    // Stream of neighbor ids of the current node.
    public LongStream getNeighbours();
}

主计算

一些 Pregel 程序可能需要所有线程完成当前超级步骤后执行的逻辑,例如,重置或评估全局数据结构。这可以通过覆盖 PregelComputationorg.neo4j.gds.beta.pregel.PregelComputation.masterCompute 函数来实现。此函数将在所有计算线程完成后,在每个超级步骤结束时调用。主计算函数将由单个线程调用。

masterCompute 函数可以访问 org.neo4j.gds.beta.pregel.PregelContext.MasterComputeContext。该上下文类似于 ComputeContext,但它不与特定节点绑定,也不允许发送消息。此外,MasterComputeContext 允许为图中的每个节点运行一个函数,并可以访问所有节点的计算状态。

主计算上下文
public final class MasterComputeContext {
    // User-defined Pregel configuration
    public PregelConfig config();
    // Number of nodes in the input graph.
    public long nodeCount();
    // Number of relationships in the input graph.
    public long relationshipCount();
    // Indicates whether the input graph is a multi-graph.
    public boolean isMultiGraph();
    // Run the given consumer for every node in the graph.
    public void forEachNode(LongPredicate consumer);
    // Double value for the given node schema key.
    public double doubleNodeValue(long nodeId, String key);
    // Double value for the given node schema key.
    public long longNodeValue(long nodeId, String key);
    // Double array value for the given node schema key.
    public double[] doubleArrayNodeValue(long nodeId, String key);
    // Long array value for the given node schema key.
    public long[] longArrayNodeValue(long nodeId, String key);
    // Sets a double node value for the given schema key.
    public void setNodeValue(long nodeId, String key, double value);
    // Sets a long node value for the given schema key.
    public void setNodeValue(long nodeId, String key, long value);
    // Sets a double array node value for the given schema key.
    public void setNodeValue(long nodeId, String key, double[] value);
    // Sets a long array node value for the given schema key.
    public void setNodeValue(long nodeId, String key, long[] value);
    // Indicates whether this is superstep 0.
    public boolean isInitialSuperstep();
    // 0-based superstep identifier.
    public int superstep();
}

消息归约器

许多 Pregel 计算依赖于从发送到节点的所有消息中计算单个值。例如,PageRank 算法计算发送到单个节点的所有消息的总和。在这些情况下,可以使用归约器将所有消息组合成一个值。如果适用,此优化可以显著改善内存消耗和计算运行时。

默认情况下,Pregel 计算不使用归约器。发送到节点的所有消息都存储在队列中,并在下一个超级步骤中接收。要启用消息归约,需要实现 reducer 方法并提供自定义或预定义的归约器。

需要实现的归约器接口。
public interface Reducer {
    // The identity element is used as the initial value.
    double identity();
    // Computes a new value based on the current value and the message.
    double reduce(double current, double message);
}

标识值用作 reduce 函数中 current 参数的初始值。所有后续调用都使用上一次调用的结果作为 current 值。

该框架已经提供了计算消息的最小值、最大值、总和和计数的实现。默认实现是 Reducer 接口的一部分,可以按如下方式应用

在自定义计算中应用求和归约器。
public class CustomComputation implements PregelComputation<PregelConfig> {

    @Override
    public void compute(PregelContext.ComputeContext<CustomConfig> context, Pregel.Messages messages) {
        // ...
        for (var message : messages) {
            // ...
        }
    }

    @Override
    public Optional<Reducer> reducer() {
        return Optional.of(new Reducer.Sum());
    }
}

compute 方法的实现不需要调整。如果存在归约器,则 messages 迭代器包含零条或一条消息。请注意,定义归约器会阻止使用异步消息传递进行计算。在这种情况下,配置中的 isAsynchronous 标志将被忽略。

配置

要配置自定义 Pregel 计算的执行,框架需要一个配置。org.neo4j.gds.beta.pregel.PregelConfig 提供了执行计算的最小选项集。这些配置选项也映射到稍后可以通过自定义过程设置的参数。这与 GDS 库中的所有其他算法相同。

表 1. Pregel 配置
名称 类型 默认值 可选 描述

maxIterations

整数

-

计算终止前的最大超级步骤数。

isAsynchronous

布尔值

false

指示消息是否可以在同一超级步骤中发送和接收的标志。

partitioning

字符串

"range"

选择输入图的分区方式,可以是 "range"、"degree" 或 "auto"。

concurrency

整数

4 [1]

用于运行算法的并发线程数。

jobId

字符串

内部生成

一个 ID,可以提供以便更轻松地跟踪算法的进度。

logProgress

布尔值

true

如果禁用,进度百分比将不会被记录。

relationshipWeightProperty

字符串

null

用作权重的关系属性名称。如果未指定,算法将运行无权重模式。

nodeLabels

字符串列表

['*']

使用给定的节点标签过滤命名图。将包含具有任何给定标签的节点。

relationshipTypes

字符串列表

['*']

使用给定的关系类型过滤命名图。将包含具有任何给定类型的关系。

concurrency

整数

4 [1]

用于运行算法的并发线程数。

jobId

字符串

内部生成

一个 ID,可以提供以便更轻松地跟踪算法的进度。

logProgress

布尔值

true

如果禁用,进度百分比将不会被记录。

writeConcurrency

整数

'concurrency' 的值

用于将结果写入 Neo4j 的并发线程数。

writeProperty

字符串

"pregel_"

在写入模式下,附加到节点模式键前缀的字符串。

mutateProperty

字符串

"pregel_"

在变异模式下,附加到节点模式键前缀的字符串。

GDS 会话 中,默认值是可用处理器的数量

对于某些算法,我们希望指定额外的配置选项。

通常,这些选项是算法特定的参数,例如阈值。自定义配置的另一个原因是计算的初始化阶段。如果我们要基于图属性初始化节点状态,我们需要通过其键访问该属性。由于这些键是图的动态属性,我们需要将它们提供给计算。我们可以通过声明一个选项在自定义配置中设置该键来实现这一点。

如果用户定义的 Pregel 计算需要自定义选项,可以通过扩展 PregelConfig 创建自定义配置。

自定义配置及其在初始化阶段的使用方法。
@ValueClass
@Configuration
public interface CustomConfig extends PregelConfig {
    // A property key that refers to a seed property.
    String seedProperty();
    // An algorithm specific parameter.
    int minDegree();
}

public class CustomComputation implements PregelComputation<CustomConfig> {

    @Override
    public void init(PregelContext.InitContext<CustomConfig> context) {
        // Use the custom config key to access a graph property.
        var seedProperties = context.nodeProperties(context.config().seedProperty());
        // Init the node state with the graph property for that node.
        context.setNodeValue("state", seedProperties.doubleValue(context.nodeId()));
    }

    @Override
    public void compute(PregelContext.ComputeContext<CustomConfig> context, Pregel.Messages messages) {
        if (context.degree() >= context.config().minDegree()) {
            // ...
        }
    }

    // ...
}

遍历传入关系

Pregel 中实现的一些算法可能需要或受益于访问和向当前上下文节点的所有传入关系发送消息的能力。GDS 支持为关系类型创建反向索引,这使得有向关系类型可以遍历传入关系。

Pregel 算法可以通过实现 org.neo4j.gds.beta.pregel.BidirectionalPregelComputation 接口而不是 PregelComputation 接口来访问此索引。实现此接口会产生以下后果

  • Pregel 框架将确保传递到算法中的所有关系都经过反向索引。如果不存在此类索引,将抛出错误。

  • initcompute 函数的签名现在分别接受 org.neo4j.gds.beta.pregel.context.InitContext.BidirectionalInitContextorg.neo4j.gds.beta.pregel.context.ComputeContext.BidirectionalComputeContext

  • 使用 @PregelProcedure 注解的算法将自动创建所有必要的反向索引。

除了 InitContextComputeContext 定义的方法之外,BidirectionalInitContextBidirectionalComputeContexts 还公开了以下新方法

//Returns the incoming degree (number of relationships) of the currently processed node.
public int incomingDegree();
// Calls the consumer for each incoming neighbor of the currently processed node.
public void forEachIncomingNeighbor(LongConsumer targetConsumer);
// Calls the consumer for each incoming neighbor of the given node.
public void forEachIncomingNeighbor(long nodeId, LongConsumer targetConsumer);
// Calls the consumer once for each incoming neighbor of the currently processed node.
public void forEachDistinctIncomingNeighbor(LongConsumer targetConsumer);
// Calls the consumer once for each incoming neighbor of the given node.
public void forEachDistinctIncomingNeighbor(long nodeId, LongConsumer targetConsumer);

此外,BidirectionalComputeContext 还公开了以下函数

// Sends the given message to all neighbors of the node.
public void sendToIncomingNeighbors(double message);

日志

以下方法适用于所有上下文(InitContextComputeContextMasterComputeContext),用于将自定义消息注入算法执行的进度日志中。

日志方法可在 Pregel 上下文中使用
// All contexts inherit from PregelContext
public abstract class PregelContext<CONFIG extends PregelConfig> {

    // Log a debug message to the Neo4j log.
    public void logDebug(String message) {
        progressTracker.logDebug(message);
    }

    // Log a warning message to the Neo4j log.
    public void logWarning(String message) {
        progressTracker.logWarning(message);
    }

    // Log a info message to the Neo4j log
    public void logMessage(String message) {
        progressTracker.logMessage(message);
    }

}

节点 ID 空间转换

某些算法需要用户提供节点作为输入。例如,最短路径算法需要知道起始节点和结束节点。在 GDS 中,存在两种节点 ID 空间:原始 ID 空间和内部 ID 空间。原始 ID 空间是内存图投影自的图的节点 ID。通常,这些是 Neo4j 节点标识符。内部 ID 空间表示内存图的节点 ID,并且始终是从 ID 0 开始的连续空间。Pregel 计算使用内部节点 ID 空间,例如,ComputeContext#nodeId() 返回当前处理节点的内部 ID。为了在原始节点 ID 空间和内部节点 ID 空间之间进行转换,所有上下文类都提供了以下方法

可在所有 Pregel 上下文中使用,在 ID 空间之间进行转换的方法
// All contexts inherit from PregelContext
public abstract class PregelContext<CONFIG extends PregelConfig> {
    // Maps the given internal node to its original counterpart.
    public long toOriginalNodeId(long internalNodeId);
    // Maps the given original node to its internal counterpart.
    public long toInternalNodeId(long originalNodeId);
}

通过 Cypher 运行 Pregel

要使自定义 Pregel 计算通过 Cypher 可访问,需要通过过程 API 将其公开。GDS 中的 Pregel 框架提供了一种简单的方法来为所有默认模式生成过程。

过程生成

要为计算生成过程,需要使用 @org.neo4j.gds.beta.pregel.annotation.PregelProcedure 注解进行标注。此外,自定义计算的配置参数必须是 org.neo4j.gds.beta.pregel.PregelProcedureConfig 的子类型。

使用 @PregelProcedure 注解配置代码生成。
@PregelProcedure(
    name = "custom.pregel.proc",
    modes = {GDSMode.STREAM, GDSMode.WRITE},
    description = "My custom Pregel algorithm"
)
public class CustomComputation implements PregelComputation<PregelProcedureConfig> {
    // ...
}

该注解为代码生成提供了许多配置选项。

表 2. 配置
名称 类型 默认值 描述

名称

字符串

-

生成的过程名称的前缀。模式将附加在其后。

模式

列表

[STREAM, WRITE, MUTATE, STATS]

为每个指定的模式生成一个过程。

描述

字符串

""

dbms.listProcedures() 中打印的过程描述。

对于上面的代码片段,我们生成四个过程

  • custom.pregel.proc.stream

  • custom.pregel.proc.stream.estimate

  • custom.pregel.proc.write

  • custom.pregel.proc.write.estimate

请注意,默认情况下,PregelSchema 中指定的所有值都包含在过程结果中。要更改此行为,我们可以更改模式中各个部分的可见性。有关更多详细信息,请参阅专用文档章节

构建和安装 Neo4j 插件

为了通过过程在 Neo4j 中使用 Pregel 算法,我们需要将其打包为 Neo4j 插件。pregel-bootstrap 项目是一个很好的起点。项目中的 build.gradle 文件包含实现 Pregel 算法和生成相应过程所需的所有依赖项。

请务必根据您的设置更改 gdsVersionneo4jVersion。GDS 和 Neo4j 是运行时依赖项。因此,GDS 需要作为插件安装在 Neo4j 服务器上。

要构建项目并创建插件 jar,只需运行

./gradlew shadowJar

您可以在 build/libs 中找到 pregel-bootstrap.jar。该 jar 需要与 GDS 插件 jar 一起放置在您的 Neo4j 安装目录中的 plugins 目录中。为了在 Cypher 中访问该过程,其命名空间可能需要添加到 neo4j.conf 文件中。

neo4j.conf 中启用示例过程
dbms.security.procedures.unrestricted=custom.pregel.proc.*
dbms.security.procedures.allowlist=custom.pregel.proc.*

示例

pregel-examples 模块包含一组 Pregel 算法示例。这些算法实现展示了 Pregel API 的用法。除了每个示例,我们还提供了测试类,可用作编写自定义算法测试的指南。为了尝试,我们建议将其中一个算法复制到 pregel-bootstrap 项目中,构建它并在 Neo4j 中设置插件。

© . All rights reserved.