用户定义过程

用户定义过程是一种机制,通过编写可直接从 Cypher 调用的定制代码来扩展 Neo4j。过程可以接受参数、对数据库执行操作并返回结果。有关用户定义过程、函数和聚合函数之间的比较,请参见Neo4j 定制代码

用户定义过程如果需要在系统数据库上执行,则必须包含注解 @SystemProcedure,否则它们将被归类为用户数据库过程。

调用过程

要调用用户定义过程,请使用 Cypher CALL 子句。过程名称必须是完全限定的,因此在包 org.neo4j.examples 中定义的名为 findDenseNodes 的过程可以通过以下方式调用:

CALL org.neo4j.examples.findDenseNodes(1000)

CALL 可能是 Cypher 语句中的唯一子句,也可能与其他子句结合使用。参数可以直接在查询中提供,也可以从相关的参数集中获取。有关完整详细信息,请参见 Cypher 手册 → CALL 过程 中的文档。

创建过程

请确保您已阅读并遵循 设置插件项目 中的准备设置说明。

下面讨论的示例可在 GitHub 上的存储库 中找到。要快速开始,您可以 Fork 该存储库,并按照下面的指南使用代码。

首先,确定过程应该做什么,然后编写一个测试来证明它做得对。最后,编写一个通过测试的过程。

集成测试

测试依赖项包括 Neo4j HarnessJUnit。这些可用于编写过程的集成测试。测试应启动一个 Neo4j 实例,加载过程,并对其执行查询。

一个使用 JUnit 5 测试返回图中找到的关系类型的过程的示例。
package example;

import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.neo4j.driver.Driver;
import org.neo4j.driver.GraphDatabase;
import org.neo4j.driver.Record;
import org.neo4j.driver.Result;
import org.neo4j.driver.Session;
import org.neo4j.driver.Value;
import org.neo4j.harness.Neo4j;
import org.neo4j.harness.Neo4jBuilders;

import static org.assertj.core.api.Assertions.assertThat;

@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class GetRelationshipTypesTests {

    private Driver driver;
    private Neo4j embeddedDatabaseServer;

    @BeforeAll
    void initializeNeo4j() {
        this.embeddedDatabaseServer = Neo4jBuilders.newInProcessBuilder()
                .withDisabledServer()
                .withProcedure(GetRelationshipTypes.class)
                .build();

        this.driver = GraphDatabase.driver(embeddedDatabaseServer.boltURI());
    }

    @AfterAll
    void closeDriver(){
        this.driver.close();
        this.embeddedDatabaseServer.close();
    }

    @AfterEach
    void cleanDb(){
        try(Session session = driver.session()) {
            session.run("MATCH (n) DETACH DELETE n");
        }
    }

    /**
     * We should be getting the correct values when there is only one type in each direction
     */
    @Test
    public void shouldReturnTheTypesWhenThereIsOneEachWay() {
        final String expectedIncoming = "INCOMING";
        final String expectedOutgoing = "OUTGOING";

        // In a try-block, to make sure we close the session after the test
        try(Session session = driver.session()) {

            //Create our data in the database.
            session.run(String.format("CREATE (:Person)-[:%s]->(:Movie {id:1})-[:%s]->(:Person)", expectedIncoming, expectedOutgoing));

            //Execute our procedure against it.
            Record record = session.run("MATCH (u:Movie {id:1}) CALL example.getRelationshipTypes(u) YIELD outgoing, incoming RETURN outgoing, incoming").single();

            //Get the incoming / outgoing relationships from the result
            assertThat(record.get("incoming").asList(Value::asString)).containsOnly(expectedIncoming);
            assertThat(record.get("outgoing").asList(Value::asString)).containsOnly(expectedOutgoing);
        }
    }
}

上一个示例使用 JUnit 5,它需要使用 org.neo4j.harness.junit.extension.Neo4jExtension。如果要在 Neo4j 4.x 或 5 中使用 JUnit 4,请改用 org.neo4j.harness.junit.rule.Neo4jRule

定义过程

测试就绪后,编写一个满足测试期望的过程。完整示例可在 Neo4j 过程模板 存储库中找到。

特别注意事项

  • 所有过程都使用 @Procedure 进行注解。

  • 过程注解可以接受三个可选参数:namemodeeager

    • name 用于为过程指定一个不同于默认生成名称(即 class.path.nameOfMethod)的名称。如果指定了 mode,则也必须指定 name

    • mode 用于声明过程执行的交互类型。如果过程尝试执行违反其模式的数据库操作,则会失败。默认 modeREAD。可用模式如下:

      • READ — 此过程仅对图执行读取操作。

      • WRITE — 此过程对图执行读取和写入操作。

      • SCHEMA — 此过程对模式执行操作,即创建和删除索引和约束。具有此模式的过程可以读取图数据,但不能写入。

      • DBMS — 此过程执行系统操作,例如用户管理和查询管理。具有此模式的过程无法读取或写入图数据。

    • eager 是一个布尔设置,默认为 false。如果设置为 true,Cypher 规划器会在调用过程之前和之后规划一个额外的 eager 操作。这在过程以可能与过程之前或之后的操作交互的方式更改数据库的情况下非常有用。例如:

      MATCH (n)
      WHERE n.key = 'value'
      WITH n
      CALL deleteNeighbours(n, 'FOLLOWS')

      此查询可以删除 Cypher 查询匹配的某些节点,并且 n.key 查找将失败。将此过程标记为 eager 可防止在 Cypher 代码中导致错误。但是,过程仍然可能通过尝试读取其先前已删除的实体来自我干扰。处理这种情况是过程作者的责任。

  • 过程的上下文,与过程要使用的每个资源相同,都用 @Context 进行注解。

在过程中发出错误信号的正确方法是抛出 RuntimeException

可注入资源

编写过程时,可以将一些资源从数据库注入到过程中。要注入这些资源,请使用 @Context 注解。可以注入的类包括:

  • Log

  • TerminationGuard

  • GraphDatabaseService

  • 事务

上述所有类都被认为是安全且面向未来的,并且不会损害数据库的安全性。一些不受支持(受限)的类也可以被注入,并且可能会在很少或没有通知的情况下进行更改。默认情况下不加载使用这些受限 API 编写的过程,您需要使用 dbms.security.procedures.unrestricted 来加载不安全的过程。有关此配置设置的更多信息,请参阅 操作手册 → 保护扩展

内存资源跟踪

过程框架的内存资源跟踪 API 可供预览。Neo4j 的未来版本可能会对此 API 进行重大更改。

如果您的过程或函数分配了大量堆内存,您可以注册分配以计入配置的事务限制,更多信息请参阅 操作手册 → 限制事务内存使用。这可以帮助您避免导致数据库重启的 OutOfMemory 错误。内存分配也会显示在查询配置文件中。

为此,您需要在过程/函数类中注入 org.neo4j.procedure.memory.ProcedureMemory 作为字段。ProcedureMemory 具有各种方法,允许您注册分配。例如(有关完整参考,请参阅 javadoc):

  • ProcedureMemoryTracker newTracker() 创建一个新的内存资源跟踪器,该跟踪器绑定到当前事务。

  • HeapEstimator heapEstimator() 估计类和实例的堆大小。

  • HeapTrackingCollectionFactory collections() 允许您创建具有内置内存跟踪其内部结构的集合。

实现内存资源跟踪通常既困难又耗时。以下是一些值得牢记的注意事项和警告:

  • 限制内存管理的范围。只关注内存中可能显著增长的部分,而忽略微小的低估。

  • 注意因多次注册同一实例的分配而导致的过高估计。如果这是一个问题,您可以添加引用计数或其他机制来避免过高估计。

  • 通常在实例分配之前不知道其大小,这可能导致您在分配完成后才注册分配。内存跟踪器实现通过始终在内部内存池中预先注册一定量的内存来尝试防止这种情况。

  • 在 Java 中,要知道实例何时被垃圾回收是很麻烦的。通常,您会在内存可能被垃圾回收时注册内存的释放。为了解决这个问题,内存跟踪器可能会在内部选择不立即注册内存的释放。

  • 测试内存资源跟踪可能很困难。一种方法是使用第三方库,例如 JAMM (Java Agent for Memory Measurements),并断言估算值对于给定输入足够接近。

用户定义过程中内存资源跟踪的基本示例。
package org.example;

import org.neo4j.procedure.Context;
import org.neo4j.procedure.Name;
import org.neo4j.procedure.Procedure;
import org.neo4j.procedure.memory.ProcedureMemory;

import java.util.Arrays;
import java.util.stream.Stream;

public class MyProcedures {

    @Context
    public ProcedureMemory memory;

    record Output(Long value) {}

    @Procedure("org.example.memoryHungryRange")
    public Stream<Output> memoryHungryRange(@Name("size") int size) {
        final var tracker = memory.newTracker();

        // Register the allocation of the long array below
        tracker.allocateHeap(memory.heapEstimator().sizeOfLongArray(size));
        // The actual allocation
        final var result = new long[size];

        for (int i = 0; i < size; i++) result[i] = i;

        return Arrays.stream(result)
                .mapToObj(Output::new)
                // Release all registered allocations when the stream is closed
                .onClose(tracker::close);
    }
}
© . All rights reserved.