使用响应式流控制结果流
在响应式流中,消费者决定从查询中消费记录的速度,而驱动程序则相应地管理从服务器请求记录的速度。
一个用例示例是应用程序从 Neo4j 服务器获取记录,并对每条记录进行非常耗时的后处理。如果允许服务器在其记录可用时立即将它们推送到客户端,则客户端可能会被大量条目淹没,而其处理仍然滞后。响应式 API 确保接收方不会被迫缓冲任意数量的数据。
驱动程序提供了两种响应式功能实现
-
org.neo4j.driver.reactivestreams
使用 Java 的响应式流 API。它依赖于reactor-core
包,该包来自 Project Reactor。 -
org.neo4j.driver.reactive
使用 Java 9 中引入的原生 Java Flow API(例如Flow.Publisher
)。
本页示例使用 org.neo4j.driver.reactivestreams
。
安装依赖项
要使用响应式功能,您需要首先将相关依赖项添加到您的项目中(请参阅 Reactor → 参考 → 获取 Reactor)。
-
将 Reactor 的 BOM 添加到您
pom.xml
的dependencyManagement
部分。请注意,这是在常规dependencies
部分之外添加的。如果您的 pom 中已存在dependencyManagement
部分,则仅添加其内容。<dependencyManagement> <dependencies> <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-bom</artifactId> <version>2023.0.2</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement>
-
将
reactor-core
依赖项添加到dependencies
部分。请注意,版本标签被省略(它将从 Reactor 的 BOM 中获取)。<dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-core</artifactId> </dependency>
响应式查询示例
驱动程序的基本概念与同步情况相同,但查询通过 ReactiveSession
运行,并且与查询相关的对象具有响应式对应物和前缀。
响应式会话的托管事务
.executeRead()
示例package demo;
import java.util.List;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.neo4j.driver.AuthTokens;
import org.neo4j.driver.Driver;
import org.neo4j.driver.GraphDatabase;
import org.neo4j.driver.Record;
import org.neo4j.driver.SessionConfig;
import org.neo4j.driver.Value;
import org.neo4j.driver.reactivestreams.ReactiveResult;
import org.neo4j.driver.reactivestreams.ReactiveSession;
public class App {
public static void main(String... args) {
final String dbUri = "{neo4j-database-uri}";
final String dbUser = "{neo4j-username}";
final String dbPassword = "{neo4j-password}";
try (var driver = GraphDatabase.driver(dbUri, AuthTokens.basic(dbUser, dbPassword))) {
driver.verifyConnectivity();
Flux<Record> records = Flux.usingWhen( (1)
Mono.just(driver.session( (2)
ReactiveSession.class, (3)
SessionConfig.builder().withDatabase("neo4j").build()
)),
rxSession -> Mono.fromDirect(rxSession.executeRead( (4)
tx -> Mono
.fromDirect(tx.run("UNWIND range (1, 5) AS x RETURN x")) (5)
.flatMapMany(ReactiveResult::records) (6)
)),
ReactiveSession::close (7)
);
// block for demonstration purposes
List<Value> values = records.map(record -> record.get("x")).collectList().block(); (8)
System.out.println(values);
}
}
}
1 | Flux.usingWhen(resourceSupplier, workerClosure, cleanupFunction) 用于创建新会话、使用它运行查询并最终关闭它。它将确保资源在需要时保持活动状态,并允许指定最终要执行的清理操作。有关此模式的更多信息,请参阅始终推迟会话创建。 |
2 | .usingWhen() 接受一个 Publisher 形式的资源提供器,因此会话创建被封装在 Mono.just() 调用中,该调用从任何值生成一个 Mono 。 |
3 | 会话创建与异步情况类似,并且适用相同的配置方法。区别在于第一个参数必须是 ReactiveSession.class ,并且返回值是 ReactiveSession 对象。 |
4 | 方法 ReactiveSession.executeRead() 运行读取事务并返回一个带有被调用者返回值的 Publisher ,Mono.fromDirect() 将其转换为 Mono 。 |
5 | 方法 tx.run() 返回一个 Publisher<ReactiveResult> ,Mono.fromDirect() 将其转换为 Mono 。 |
6 | 在返回最终结果之前,Mono.flatMapMany() 从结果中检索记录并将其作为新的 Flux 返回。 |
7 | 最终清理会关闭会话。 |
8 | 为了显示响应式工作流的结果,.block() 会等待流完成,以便可以打印值。在实际应用程序中,您不会阻塞,而是将记录发布者转发给您选择的框架,该框架将以有意义的方式处理它们。 |
您可以通过对 workerClosure 中的 executeRead/Write() 进行多次调用,在同一个响应式会话中运行多个查询。 |
响应式会话的隐式事务
以下示例与上一个非常相似,只是它使用隐式事务。
.run()
示例package demo;
import java.util.List;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.neo4j.driver.AuthTokens;
import org.neo4j.driver.Driver;
import org.neo4j.driver.GraphDatabase;
import org.neo4j.driver.Record;
import org.neo4j.driver.SessionConfig;
import org.neo4j.driver.Value;
import org.neo4j.driver.reactivestreams.ReactiveResult;
import org.neo4j.driver.reactivestreams.ReactiveSession;
public class App {
public static void main(String... args) {
final String dbUri = "{neo4j-database-uri}";
final String dbUser = "{neo4j-username}";
final String dbPassword = "{neo4j-password}";
try (var driver = GraphDatabase.driver(dbUri, AuthTokens.basic(dbUser, dbPassword))) {
driver.verifyConnectivity();
Flux<Record> records = Flux.usingWhen(
Mono.just(driver.session(
ReactiveSession.class,
SessionConfig.builder().withDatabase("neo4j").build()
)),
rxSession -> Mono
.fromDirect(rxSession.run("UNWIND range (1, 5) AS x RETURN x"))
.flatMapMany(ReactiveResult::records),
ReactiveSession::close
);
// block for demonstration purposes
List<Value> values = records.map(record -> record.get("x")).collectList().block();
System.out.println(values);
}
}
}
始终推迟会话创建
在响应式编程中,Publisher 直到 Subscriber 附加到它时才生效:Publisher 只是对您的异步过程的抽象描述,但只有订阅行为才会触发整个链中的数据流。
因此,请注意将会话的创建/销毁作为此链的一部分,而不是独立于查询 Publisher 链创建会话。否则可能导致许多开放会话,它们都不工作且都在等待 Publisher 使用,这可能会耗尽您应用程序的可用会话数量。前面的示例使用 Flux.usingWhen()
来解决此问题。
ReactiveSession rxSession = driver.session(ReactiveSession.class);
Mono<ReactiveResult> rxResult = Mono.fromDirect(rxSession.run("UNWIND range (1, 5) AS x RETURN x"));
// until somebody subscribes to `rxResult`, the Publisher doesn't materialize, but the session is busy!
词汇表
- LTS
-
长期支持 (LTS) 版本是指保证多年支持的版本。Neo4j 4.4 是 LTS 版本,Neo4j 5 也将有 LTS 版本。
- Aura
-
Aura 是 Neo4j 的全托管云服务。它提供免费和付费计划。
- Cypher
-
Cypher 是 Neo4j 的图查询语言,用于从数据库中检索数据。它类似于 SQL,但适用于图。
- APOC
-
Awesome Procedures On Cypher (APOC) 是一个函数库(包含许多函数),这些函数本身无法在 Cypher 中轻松表达。
- Bolt
-
Bolt 是 Neo4j 实例和驱动程序之间交互所使用的协议。它默认监听端口 7687。
- ACID
-
原子性(Atomicity)、一致性(Consistency)、隔离性(Isolation)、持久性(Durability)(ACID) 是保证数据库事务可靠处理的属性。符合 ACID 的 DBMS 确保数据库中的数据即使发生故障也能保持准确和一致。
- 最终一致性
-
如果数据库提供所有集群成员将在某个时间点存储最新版本数据的保证,则该数据库是最终一致的。
- 因果一致性
-
如果集群的每个成员都以相同的顺序看到读写查询,则数据库是因果一致的。这比最终一致性更强。
- NULL
-
空值标记不是一种类型,而是表示值不存在的占位符。有关更多信息,请参阅Cypher → 使用
null
。 - 事务
-
事务是一个工作单元,它要么整体提交,要么在失败时回滚。一个例子是银行转账:它涉及多个步骤,但它们必须全部成功或被撤销,以避免从一个账户扣除资金但未添加到另一个账户。
- 背压
-
背压是一种与数据流相反的力。它确保客户端不会被超出其处理能力的数据淹没。
- 事务函数
-
事务函数是
executeRead
或executeWrite
调用执行的回调。如果服务器发生故障,驱动程序会自动重新执行该回调。 - 驱动程序
-
一个
Driver
对象包含与 Neo4j 数据库建立连接所需的详细信息。