使用响应式流控制结果流
在响应式流中,消费者决定从查询中消费记录的速度,驱动程序依次管理从服务器请求记录的速度。
一个用例示例是应用程序从 Neo4j 服务器获取记录,并在每个记录上执行一些非常耗时的后处理。如果允许服务器在记录可用时立即将它们推送到客户端,则客户端可能会被大量条目淹没,而其处理仍在落后。响应式 API 确保接收方不会被迫缓冲任意数量的数据。
驱动程序的响应式实现位于 reactivestreams
子包 中,并依赖于 reactor-core
包(来自 Project Reactor)。
安装依赖项
要使用响应式功能,您首先需要将相关依赖项添加到您的项目中(参考 Reactor → 参考 → 获取 Reactor)。
-
在
pom.xml
的dependencyManagement
部分中添加 Reactor 的 BOM。请注意,这除了常规的dependencies
部分之外。如果pom
中已经存在dependencyManagement
部分,则只需添加内容。<dependencyManagement> <dependencies> <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-bom</artifactId> <version>2023.0.2</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement>
-
将
reactor-core
依赖项添加到dependencies
部分。请注意,版本标签被省略了(它从 Reactor 的 BOM 中获取)。<dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-core</artifactId> </dependency>
响应式查询示例
基本驱动程序的概念与同步情况相同,但查询是通过 ReactiveSession
运行的,与查询相关的对象具有响应式对应项和前缀。
使用响应式会话的管理事务
.executeRead()
示例package demo;
import java.util.List;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.neo4j.driver.AuthTokens;
import org.neo4j.driver.Driver;
import org.neo4j.driver.GraphDatabase;
import org.neo4j.driver.Record;
import org.neo4j.driver.SessionConfig;
import org.neo4j.driver.Value;
import org.neo4j.driver.reactivestreams.ReactiveResult;
import org.neo4j.driver.reactivestreams.ReactiveSession;
public class App {
public static void main(String... args) {
final String dbUri = "<URI for Neo4j database>";
final String dbUser = "<Username>";
final String dbPassword = "<Password>";
try (var driver = GraphDatabase.driver(dbUri, AuthTokens.basic(dbUser, dbPassword))) {
driver.verifyConnectivity();
Flux<Record> records = Flux.usingWhen( (1)
Mono.just(driver.session( (2)
ReactiveSession.class, (3)
SessionConfig.builder().withDatabase("neo4j").build()
)),
rxSession -> Mono.fromDirect(rxSession.executeRead( (4)
tx -> Mono
.fromDirect(tx.run("UNWIND range (1, 5) AS x RETURN x")) (5)
.flatMapMany(ReactiveResult::records) (6)
)),
ReactiveSession::close (7)
);
// block for demonstration purposes
List<Value> values = records.map(record -> record.get("x")).collectList().block(); (8)
System.out.println(values);
}
}
}
1 | Flux.usingWhen(resourceSupplier, workerClosure, cleanupFunction) 用于创建新会话,使用它运行查询,最后关闭它。它将确保资源在需要的时间内处于活动状态,并允许指定在结束时要执行的清理操作。 |
2 | .usingWhen() 以 Publisher 的形式接受一个资源提供者,因此会话创建被包装在一个 Mono.just() 调用中,它从任何值生成一个 Mono 。 |
3 | 会话创建类似于异步情况,并且 相同的配置方法适用。区别在于第一个参数必须是 ReactiveSession.class ,返回值是 ReactiveSession 对象。 |
4 | 方法 ReactiveSession.executeRead() 运行读取事务并返回一个包含调用者返回值的 Publisher ,Mono.fromDirect() 将其转换为 Mono 。 |
5 | 方法 tx.run() 返回一个 Publisher<ReactiveResult> ,Mono.fromDirect() 将其转换为 Mono 。 |
6 | 在返回最终结果之前,Mono.flatMapMany() 从结果中检索记录并将其作为新的 Flux 返回。 |
7 | 最终清理会关闭会话。 |
8 | 为了显示响应式工作流的结果,.block() 等待流程完成,以便可以打印值。在实际应用程序中,您不会阻塞,而是将记录发布者转发到您选择的框架,该框架将以有意义的方式处理它们。 |
您可以在 workerClosure 内通过多次调用 executeRead/Write() 在同一个响应式会话中运行多个查询。 |
使用响应式会话的隐式事务
以下示例与前一个示例非常相似,只是它使用了 隐式事务。
.run()
示例package demo;
import java.util.List;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.neo4j.driver.AuthTokens;
import org.neo4j.driver.Driver;
import org.neo4j.driver.GraphDatabase;
import org.neo4j.driver.Record;
import org.neo4j.driver.SessionConfig;
import org.neo4j.driver.Value;
import org.neo4j.driver.reactivestreams.ReactiveResult;
import org.neo4j.driver.reactivestreams.ReactiveSession;
public class App {
public static void main(String... args) {
final String dbUri = "<URI for Neo4j database>";
final String dbUser = "<Username>";
final String dbPassword = "<Password>";
try (var driver = GraphDatabase.driver(dbUri, AuthTokens.basic(dbUser, dbPassword))) {
driver.verifyConnectivity();
Flux<Record> records = Flux.usingWhen(
Mono.just(driver.session(
ReactiveSession.class,
SessionConfig.builder().withDatabase("neo4j").build()
)),
rxSession -> Mono
.fromDirect(rxSession.run("UNWIND range (1, 5) AS x RETURN x"))
.flatMapMany(ReactiveResult::records),
ReactiveSession::close
);
// block for demonstration purposes
List<Value> values = records.map(record -> record.get("x")).collectList().block();
System.out.println(values);
}
}
}
始终延迟会话创建
重要的是要记住,在响应式编程中,发布者只有在订阅者附加到它时才会生效。发布者只是您异步过程的抽象描述,但只有订阅行为才会触发整个链中的数据流。
出于这个原因,始终要注意将会话创建/销毁作为此链的一部分,而不是与查询发布者链分开创建会话。这样做可能会导致许多打开的会话,没有一个会话在工作,并且所有会话都在等待发布者使用它们,这可能会耗尽应用程序可用会话的数量。前面的示例使用 Flux.usingWhen()
来解决这个问题。
ReactiveSession rxSession = driver.session(ReactiveSession.class);
Mono<ReactiveResult> rxResult = Mono.fromDirect(rxSession.run("UNWIND range (1, 5) AS x RETURN x"));
// until somebody subscribes to `rxResult`, the Publisher doesn't materialize, but the session is busy!
词汇表
- LTS
-
长期支持版本是指保证支持数年的一类版本。Neo4j 4.4 是 LTS 版本,Neo4j 5 也将有一个 LTS 版本。
- Aura
-
Aura 是 Neo4j 的完全托管云服务。它提供免费和付费计划。
- Cypher
-
Cypher 是 Neo4j 的图查询语言,允许您从数据库中检索数据。它类似于 SQL,但适用于图形。
- APOC
-
Awesome Procedures On Cypher (APOC) 是一个包含了(许多)函数的库,这些函数无法在 Cypher 本身中轻松表达。
- Bolt
-
Bolt 是 Neo4j 实例与驱动程序之间交互使用的协议。默认情况下,它监听端口 7687。
- ACID
-
原子性、一致性、隔离性、持久性(ACID)是保证数据库事务可靠处理的属性。符合 ACID 的 DBMS 确保数据库中的数据即使在发生故障的情况下也能保持准确和一致。
- 最终一致性
-
如果数据库保证所有集群成员最终都会存储数据的最新版本(在某个时间点),那么它就是最终一致的。
- 因果一致性
-
如果数据库保证所有集群成员以相同的顺序看到读写查询,那么它就是因果一致的。这比最终一致性更强。
- NULL
-
空标记不是一种类型,而是值缺失的占位符。有关详细信息,请参见 Cypher → 使用
null
。 - 事务
-
事务是一项工作单元,它要么全部提交,要么在失败时回滚。一个例子是银行转账:它涉及多个步骤,但这些步骤必须全部成功或被撤消,以避免从一个账户中扣除资金但未将其添加到另一个账户中。
- 背压
-
背压是一种阻碍数据流的力量。它确保客户端不会被它无法处理的速度更快的数据淹没。
- 事务函数
-
事务函数是由
executeRead
或executeWrite
调用执行的回调函数。驱动程序在服务器发生故障的情况下会自动重新执行回调函数。 - 驱动程序
-
一个
Driver
对象保存了建立与 Neo4j 数据库连接所需的详细信息。