用户定义过程

用户定义过程是一种机制,使您能够通过编写自定义代码来扩展 Neo4j,该代码可以直接从 Cypher 调用。过程可以接受参数,对数据库执行操作并返回结果。有关用户定义过程、函数和聚合函数之间比较的信息,请参阅Neo4j 自定义代码

需要在系统数据库上执行的用户定义过程需要包含注释@SystemProcedure,否则它们将被归类为用户数据库过程。

调用过程

要调用用户定义过程,请使用 Cypher 的CALL子句。过程名称必须是完全限定的,因此在包org.neo4j.examples中定义的名为findDenseNodes的过程可以使用以下方式调用:

CALL org.neo4j.examples.findDenseNodes(1000)

CALL可以是 Cypher 语句中的唯一子句,也可以与其他子句组合使用。参数可以直接在查询中提供,也可以从关联的参数集中获取。有关完整详细信息,请参阅Cypher 手册 → CALL过程中的文档。

创建过程

确保您已阅读并遵循了设置插件项目中的准备设置说明。

下面讨论的示例可在GitHub 上的存储库中找到。要快速入门,您可以派生存储库并在按照以下指南的过程中使用代码。

首先,确定过程应该做什么,然后编写一个测试来证明它正确地执行了操作。最后,编写一个通过测试的过程。

集成测试

测试依赖项包括Neo4j HarnessJUnit。这些可用于编写过程的集成测试。测试应启动 Neo4j 实例,加载过程并对其执行查询。

一个使用 JUnit 5 测试返回图中找到的关系类型的过程的示例。
package example;

import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.neo4j.driver.Driver;
import org.neo4j.driver.GraphDatabase;
import org.neo4j.driver.Record;
import org.neo4j.driver.Result;
import org.neo4j.driver.Session;
import org.neo4j.driver.Value;
import org.neo4j.harness.Neo4j;
import org.neo4j.harness.Neo4jBuilders;

import static org.assertj.core.api.Assertions.assertThat;

@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class GetRelationshipTypesTests {

    private Driver driver;
    private Neo4j embeddedDatabaseServer;

    @BeforeAll
    void initializeNeo4j() {
        this.embeddedDatabaseServer = Neo4jBuilders.newInProcessBuilder()
                .withDisabledServer()
                .withProcedure(GetRelationshipTypes.class)
                .build();

        this.driver = GraphDatabase.driver(embeddedDatabaseServer.boltURI());
    }

    @AfterAll
    void closeDriver(){
        this.driver.close();
        this.embeddedDatabaseServer.close();
    }

    @AfterEach
    void cleanDb(){
        try(Session session = driver.session()) {
            session.run("MATCH (n) DETACH DELETE n");
        }
    }

    /**
     * We should be getting the correct values when there is only one type in each direction
     */
    @Test
    public void shouldReturnTheTypesWhenThereIsOneEachWay() {
        final String expectedIncoming = "INCOMING";
        final String expectedOutgoing = "OUTGOING";

        // In a try-block, to make sure we close the session after the test
        try(Session session = driver.session()) {

            //Create our data in the database.
            session.run(String.format("CREATE (:Person)-[:%s]->(:Movie {id:1})-[:%s]->(:Person)", expectedIncoming, expectedOutgoing));

            //Execute our procedure against it.
            Record record = session.run("MATCH (u:Movie {id:1}) CALL example.getRelationshipTypes(u) YIELD outgoing, incoming RETURN outgoing, incoming").single();

            //Get the incoming / outgoing relationships from the result
            assertThat(record.get("incoming").asList(Value::asString)).containsOnly(expectedIncoming);
            assertThat(record.get("outgoing").asList(Value::asString)).containsOnly(expectedOutgoing);
        }
    }
}

前面的示例使用 JUnit 5,它需要使用org.neo4j.harness.junit.extension.Neo4jExtension。如果要将 JUnit 4 与 Neo4j 4.x 或 5 一起使用,请改用org.neo4j.harness.junit.rule.Neo4jRule

定义过程

有了测试,编写一个满足测试期望的过程。完整示例可在Neo4j 过程模板存储库中找到。

需要注意的具体事项

  • 所有过程都使用@Procedure进行注释。

  • 过程注释可以带三个可选参数:namemodeeager

    • name用于为过程指定与默认生成的名称不同的名称,该名称为class.path.nameOfMethod。如果指定了mode,则也必须指定name

    • mode用于声明过程执行的交互类型。如果过程尝试执行违反其模式的数据库操作,则会失败。默认modeREAD。以下模式可用

      • READ — 此过程仅对图执行读取操作。

      • WRITE — 此过程对图执行读取和写入操作。

      • SCHEMA — 此过程对模式执行操作,即创建和删除索引和约束。具有此模式的过程可以读取图数据,但不能写入。

      • DBMS — 此过程执行系统操作,例如用户管理和查询管理。具有此模式的过程无法读取或写入图数据。

    • eager是一个布尔设置,默认为false。如果将其设置为true,则 Cypher 计划器将在调用过程之前和之后计划一个额外的eager操作。这在过程以可能与过程之前或之后的操作交互的方式更改数据库的情况下很有用。例如

      MATCH (n)
      WHERE n.key = 'value'
      WITH n
      CALL deleteNeighbours(n, 'FOLLOWS')

      此查询可以删除 Cypher 查询匹配的一些节点,并且n.key查找将失败。将此过程标记为eager可以防止这导致 Cypher 代码出错。但是,过程仍然可以通过尝试读取之前已删除的实体来干扰自身。处理这种情况是过程作者的责任。

  • 过程的上下文(与过程想要使用的每个资源相同)使用@Context进行注释。

从过程内部发出错误的正确方法是抛出RuntimeException

可注入资源

在编写过程时,某些资源可以从数据库注入到过程中。要注入这些资源,请使用@Context注释。可以注入的类包括

  • 日志

  • 终止防护

  • GraphDatabaseService

  • 事务

所有上述类都被认为是安全且面向未来的,不会危及数据库的安全。还可以注入一些不受支持(受限)的类,并且可以更改这些类,而无需或只需很少的通知。编写为使用这些受限 API 的过程不会默认加载,并且您需要使用dbms.security.procedures.unrestricted加载不安全的过程。有关此配置设置的更多信息,请阅读操作手册 → 保护扩展

内存资源跟踪

过程框架的内存资源跟踪 API 可用于预览。Neo4j 的未来版本可能会对此 API 进行重大更改。

如果您的过程或函数分配了大量堆内存,则可以注册分配以计入已配置的事务限制,请参阅操作手册 → 限制事务内存使用以获取更多信息。这使您可以避免导致数据库重新启动的OutOfMemory错误。内存分配也会显示在查询配置文件中。

为此,您需要将org.neo4j.procedure.memory.ProcedureMemory作为字段注入到您的过程/函数类中。ProcedureMemory具有各种方法,允许您注册分配。例如(有关完整参考,请参阅 javadocs)

  • ProcedureMemoryTracker newTracker()创建一个新的内存资源跟踪器,该跟踪器绑定到当前事务。

  • HeapEstimator heapEstimator()估计类和实例的堆大小。

  • HeapTrackingCollectionFactory collections()允许您创建具有其内部结构的内置内存跟踪的集合。

实现内存资源跟踪通常既困难又耗时。以下是一些值得牢记的注意事项和警告。

  • 限制内存管理的范围。只关注内存中可能显著增长的部分,忽略轻微的低估。

  • 注意多次注册同一实例的分配可能导致高估。如果这是个问题,您可以添加引用计数或其他机制来避免高估。

  • 在实例分配之前通常不知道其大小,这可能导致您在分配完成后才注册分配。内存跟踪器实现尝试通过始终在内部内存池中预注册一定量的内存来防止这种情况。

  • 在 Java 中,知道实例何时被垃圾回收很麻烦。通常,您在内存可能被垃圾回收的点注册内存释放。为了解决这个问题,内存跟踪器在内部可能会选择不立即注册内存释放。

  • 测试内存资源跟踪可能很困难。一种方法是使用第三方库,例如 JAMM(Java Agent for Memory Measurements),并断言对于某些给定输入,估计值足够接近。

用户定义过程中内存资源跟踪的基本示例。
package org.example;

import org.neo4j.procedure.Context;
import org.neo4j.procedure.Name;
import org.neo4j.procedure.Procedure;
import org.neo4j.procedure.memory.ProcedureMemory;

import java.util.Arrays;
import java.util.stream.Stream;

public class MyProcedures {

    @Context
    public ProcedureMemory memory;

    record Output(Long value) {}

    @Procedure("org.example.memoryHungryRange")
    public Stream<Output> memoryHungryRange(@Name("size") int size) {
        final var tracker = memory.newTracker();

        // Register the allocation of the long array below
        tracker.allocateHeap(memory.heapEstimator().sizeOfLongArray(size));
        // The actual allocation
        final var result = new long[size];

        for (int i = 0; i < size; i++) result[i] = i;

        return Arrays.stream(result)
                .mapToObj(Output::new)
                // Release all registered allocations when the stream is closed
                .onClose(tracker::close);
    }
}