使用响应式流控制结果流

在响应式流中,消费者决定从查询中消费记录的速度,驱动程序依次管理从服务器请求记录的速度。

一个用例示例是应用程序从 Neo4j 服务器获取记录,并在每个记录上执行一些非常耗时的后处理。如果允许服务器在记录可用时立即将它们推送到客户端,则客户端可能会被大量条目淹没,而其处理仍在落后。响应式 API 确保接收方不会被迫缓冲任意数量的数据。

驱动程序的响应式实现位于 reactivestreams 子包 中,并依赖于 reactor-core(来自 Project Reactor)。

对于已经使用响应式编程风格且其需求只能通过响应式工作流来满足的应用程序,建议使用响应式 API。对于所有其他情况,建议使用 同步异步 API。

安装依赖项

要使用响应式功能,您首先需要将相关依赖项添加到您的项目中(参考 Reactor → 参考 → 获取 Reactor)。

  1. pom.xmldependencyManagement 部分中添加 Reactor 的 BOM。请注意,这除了常规的 dependencies 部分之外。如果 pom 中已经存在 dependencyManagement 部分,则只需添加内容。

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>io.projectreactor</groupId>
                <artifactId>reactor-bom</artifactId>
                <version>2023.0.2</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>
  2. reactor-core 依赖项添加到 dependencies 部分。请注意,版本标签被省略了(它从 Reactor 的 BOM 中获取)。

    <dependency>
        <groupId>io.projectreactor</groupId>
        <artifactId>reactor-core</artifactId>
    </dependency>

响应式查询示例

基本驱动程序的概念与同步情况相同,但查询是通过 ReactiveSession 运行的,与查询相关的对象具有响应式对应项和前缀。

使用响应式会话的管理事务

管理事务 .executeRead() 示例
package demo;

import java.util.List;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import org.neo4j.driver.AuthTokens;
import org.neo4j.driver.Driver;
import org.neo4j.driver.GraphDatabase;
import org.neo4j.driver.Record;
import org.neo4j.driver.SessionConfig;
import org.neo4j.driver.Value;
import org.neo4j.driver.reactivestreams.ReactiveResult;
import org.neo4j.driver.reactivestreams.ReactiveSession;

public class App {

    public static void main(String... args) {
        final String dbUri = "<URI for Neo4j database>";
        final String dbUser = "<Username>";
        final String dbPassword = "<Password>";

        try (var driver = GraphDatabase.driver(dbUri, AuthTokens.basic(dbUser, dbPassword))) {
            driver.verifyConnectivity();

            Flux<Record> records = Flux.usingWhen(  (1)
                Mono.just(driver.session(  (2)
                    ReactiveSession.class,  (3)
                    SessionConfig.builder().withDatabase("neo4j").build()
                )),
                rxSession -> Mono.fromDirect(rxSession.executeRead(  (4)
                    tx -> Mono
                        .fromDirect(tx.run("UNWIND range (1, 5) AS x RETURN x"))  (5)
                        .flatMapMany(ReactiveResult::records)  (6)
                )),
                ReactiveSession::close  (7)
            );

            // block for demonstration purposes
            List<Value> values = records.map(record -> record.get("x")).collectList().block();  (8)
            System.out.println(values);
        }
    }
}
1 Flux.usingWhen(resourceSupplier, workerClosure, cleanupFunction) 用于创建新会话,使用它运行查询,最后关闭它。它将确保资源在需要的时间内处于活动状态,并允许指定在结束时要执行的清理操作。
2 .usingWhen()Publisher 的形式接受一个资源提供者,因此会话创建被包装在一个 Mono.just() 调用中,它从任何值生成一个 Mono
3 会话创建类似于异步情况,并且 相同的配置方法适用。区别在于第一个参数必须是 ReactiveSession.class,返回值是 ReactiveSession 对象。
4 方法 ReactiveSession.executeRead() 运行读取事务并返回一个包含调用者返回值的 PublisherMono.fromDirect() 将其转换为 Mono
5 方法 tx.run() 返回一个 Publisher<ReactiveResult>Mono.fromDirect() 将其转换为 Mono
6 在返回最终结果之前,Mono.flatMapMany() 从结果中检索记录并将其作为新的 Flux 返回。
7 最终清理会关闭会话。
8 为了显示响应式工作流的结果,.block() 等待流程完成,以便可以打印值。在实际应用程序中,您不会阻塞,而是将记录发布者转发到您选择的框架,该框架将以有意义的方式处理它们。
您可以在 workerClosure 内通过多次调用 executeRead/Write() 在同一个响应式会话中运行多个查询。

使用响应式会话的隐式事务

以下示例与前一个示例非常相似,只是它使用了 隐式事务

隐式事务 .run() 示例
package demo;

import java.util.List;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import org.neo4j.driver.AuthTokens;
import org.neo4j.driver.Driver;
import org.neo4j.driver.GraphDatabase;
import org.neo4j.driver.Record;
import org.neo4j.driver.SessionConfig;
import org.neo4j.driver.Value;
import org.neo4j.driver.reactivestreams.ReactiveResult;
import org.neo4j.driver.reactivestreams.ReactiveSession;

public class App {

    public static void main(String... args) {
        final String dbUri = "<URI for Neo4j database>";
        final String dbUser = "<Username>";
        final String dbPassword = "<Password>";

        try (var driver = GraphDatabase.driver(dbUri, AuthTokens.basic(dbUser, dbPassword))) {
            driver.verifyConnectivity();

            Flux<Record> records = Flux.usingWhen(
                Mono.just(driver.session(
                    ReactiveSession.class,
                    SessionConfig.builder().withDatabase("neo4j").build()
                )),
                rxSession -> Mono
                    .fromDirect(rxSession.run("UNWIND range (1, 5) AS x RETURN x"))
                    .flatMapMany(ReactiveResult::records),
                ReactiveSession::close
            );

            // block for demonstration purposes
            List<Value> values = records.map(record -> record.get("x")).collectList().block();
            System.out.println(values);
        }
    }
}

始终延迟会话创建

重要的是要记住,在响应式编程中,发布者只有在订阅者附加到它时才会生效。发布者只是您异步过程的抽象描述,但只有订阅行为才会触发整个链中的数据流。

出于这个原因,始终要注意将会话创建/销毁作为此链的一部分,而不是与查询发布者链分开创建会话。这样做可能会导致许多打开的会话,没有一个会话在工作,并且所有会话都在等待发布者使用它们,这可能会耗尽应用程序可用会话的数量。前面的示例使用 Flux.usingWhen() 来解决这个问题。

不良做法示例 - 会话已创建但无人使用
ReactiveSession rxSession = driver.session(ReactiveSession.class);
Mono<ReactiveResult> rxResult = Mono.fromDirect(rxSession.run("UNWIND range (1, 5) AS x RETURN x"));
// until somebody subscribes to `rxResult`, the Publisher doesn't materialize, but the session is busy!

词汇表

LTS

长期支持版本是指保证支持数年的一类版本。Neo4j 4.4 是 LTS 版本,Neo4j 5 也将有一个 LTS 版本。

Aura

Aura 是 Neo4j 的完全托管云服务。它提供免费和付费计划。

Cypher

Cypher 是 Neo4j 的图查询语言,允许您从数据库中检索数据。它类似于 SQL,但适用于图形。

APOC

Awesome Procedures On Cypher (APOC) 是一个包含了(许多)函数的库,这些函数无法在 Cypher 本身中轻松表达。

Bolt

Bolt 是 Neo4j 实例与驱动程序之间交互使用的协议。默认情况下,它监听端口 7687。

ACID

原子性、一致性、隔离性、持久性(ACID)是保证数据库事务可靠处理的属性。符合 ACID 的 DBMS 确保数据库中的数据即使在发生故障的情况下也能保持准确和一致。

最终一致性

如果数据库保证所有集群成员最终都会存储数据的最新版本(在某个时间点),那么它就是最终一致的。

因果一致性

如果数据库保证所有集群成员以相同的顺序看到读写查询,那么它就是因果一致的。这比最终一致性更强。

NULL

空标记不是一种类型,而是值缺失的占位符。有关详细信息,请参见 Cypher → 使用 null

事务

事务是一项工作单元,它要么全部提交,要么在失败时回滚。一个例子是银行转账:它涉及多个步骤,但这些步骤必须全部成功或被撤消,以避免从一个账户中扣除资金但未将其添加到另一个账户中。

背压

背压是一种阻碍数据流的力量。它确保客户端不会被它无法处理的速度更快的数据淹没。

事务函数

事务函数是由 executeReadexecuteWrite 调用执行的回调函数。驱动程序在服务器发生故障的情况下会自动重新执行回调函数。

驱动程序

一个 Driver 对象保存了建立与 Neo4j 数据库连接所需的详细信息。