反压

本节介绍 4.x 驱动程序中的反压。

Neo4j 4.0 引入了客户端反压。客户端反压的概念是客户端与远程服务器通信,告知其能够处理多少数据,并且仅在准备好使用更多数据时才请求额外数据。

反压概念自然兼容响应式编程。因此,在 4.0 驱动程序版本中,所有语言驱动程序都添加了响应式 API 支持。

Java 驱动程序的响应式 API 公开了原始的发布者-订阅者 API,该 API 由响应式流定义。Java 驱动程序的响应式 API 与响应式库一起使用,例如Project Reactor和/或RxJava

.NET 使用内置的System.Reactive。JavaScript 驱动程序使用RxJs库。

这些库属于同一个响应式框架ReactiveX

要使用驱动程序的响应式 API,需要具备响应式编程的初步知识。有关如何使用 Neo4j 响应式驱动程序 API 的详细信息,请参阅Neo4j 驱动程序手册 4.0

但是,反压不仅限于驱动程序的响应式 API。所有其他 API(例如简单 API 和异步 API)在处理查询执行结果时默认情况下都启用了反压。

表 1. 不同语言驱动程序会话 API 中反压的实现方式
简单 API 异步 API 响应式 API

Java 驱动程序

记录缓冲区

记录缓冲区

原始发布者-订阅者 API

.NET 驱动程序

记录缓冲区

记录缓冲区

记录缓冲区

Javascript 驱动程序

不适用

记录缓冲区

记录缓冲区

Bolt 4.0 和记录缓冲区中的反压

Neo4j 4.0 服务器和驱动程序实现了 Bolt 4.0。此 Bolt 版本引入的主要功能之一是以批次方式提取查询结果(记录)。在以前的 Bolt 版本中,完整的 结果集始终以一个批次从服务器拉取到驱动程序。Bolt 4.0 使您能够以多个批次提取这些结果,其中每个批次的大小可以通过 fetchSize 定义。默认情况下,驱动程序使用 1000 条记录的 fetchSize

随着记录批处理的引入,驱动程序可以实现客户端反压。对于每个结果,驱动程序都会保留一个未使用的记录缓冲区。缓冲区大小与每个批次的 fetchSize 相同。当缓冲区超过 70% 充满时,从服务器拉取记录的操作会暂停,并且当缓冲区低于 30% 充满时,记录拉取操作会重新启用。使用默认的 1000 条记录的 fetchSize,当缓冲区中超过 700 条记录时,记录拉取操作会暂停,并且当缓冲区降至 300 条以下时,记录拉取操作会恢复。

示例 1. 在驱动程序上设置默认的 fetchSize,并在会话上更改默认值。
import org.neo4j.driver.AuthTokens;
import org.neo4j.driver.Config;
import org.neo4j.driver.Driver;
import org.neo4j.driver.GraphDatabase;
import org.neo4j.driver.Session;
import org.neo4j.driver.SessionConfig;
...

Config config = Config.builder().withFetchSize( 2000 ).build();
Driver driver = GraphDatabase.driver( uri, AuthTokens.basic( user, password ), config );

SessionConfig sessionConfig = SessionConfig.builder()
                                          .withDatabase( "neo4j" )
                                          .withFetchSize( 100 )
                                          .build();
try ( Session session = driver.session( sessionConfig ) ) {...}

Java 驱动程序响应式 API

Java 驱动程序的响应式 API 公开了非常底层的发布者-订阅者 API。因此,它默认情况下不会执行任何类型的反压。相反,驱动程序用户应使用响应式框架来利用反压。根据响应式框架的不同,该框架可能会通过暂停从 Neo4j 服务器拉取数据或在处理的数据过多时丢弃数据来应用反压。