会话 API
本节详细介绍了驱动程序提供的会话 API。
简单会话
简单会话为 Cypher 执行提供了“经典”的 阻塞式 API。通常,简单会话提供了最简单的编程方式,因为 API 调用以严格的顺序执行。
生命周期
会话生命周期从会话构造开始,到会话关闭结束。在支持上下文块的语言中,简单会话通常在上下文块内作用域;这确保了它们被正确关闭,并且任何底层连接都被释放且不泄漏。
using (var session = driver.Session(...)) {
// transactions go here
}
会话可以通过多种不同方式配置。这通过在会话构造函数中提供配置来实现。有关更多详细信息,请参阅会话配置。
事务函数
事务函数用于包含事务性工作单元。这种形式的事务需要最少的样板代码,并允许清晰地分离数据库查询和应用程序逻辑。
事务函数也很受欢迎,因为它们封装了重试逻辑,并在将单个服务器实例替换为集群时提供了最大程度的灵活性。
事务函数可以作为读操作或写操作调用。此选择会将事务路由到集群环境中适当的服务器。如果您在单实例环境中操作,此路由没有影响。但如果您稍后选择采用集群环境,它会提供灵活性。
在编写事务函数之前,重要的是要确保它被设计为幂等的。这是因为如果初始运行失败,函数可能会被执行多次。
在事务函数中获得的任何查询结果都应在该函数内部使用,因为连接绑定的资源在超出范围时无法正确管理。为此,事务函数可以返回值,但这些值应该是派生值而不是原始结果。
事务函数是包含事务性工作单元的推荐形式. 当事务失败时,将调用驱动程序的重试逻辑。对于几种失败情况,事务可以立即在不同的服务器上重试。这些情况包括连接问题、服务器角色更改(例如领导者选举)和瞬态错误。 |
ExecuteRead 和 ExecuteWrite 方法已取代 ReadTransaction 和 WriteTransaction ,后者在 5.x 版本中已弃用,并将在 6.0 版本中移除。 |
public void AddPerson(string name)
{
using var session = Driver.Session();
session.ExecuteWrite(tx => tx.Run("CREATE (a:Person {name: $name})", new { name }).Consume());
}
自动提交事务(或隐式事务)
自动提交事务或隐式事务是一种基本但有限的事务形式。此类事务仅包含一个 Cypher 查询,并且在失败时不会自动重试。因此,任何错误场景都需要由客户端应用程序自行处理。
自动提交事务有以下用途
-
简单用例,例如学习 Cypher 或编写一次性脚本时。
-
诸如批量数据加载操作之类的操作,其中驱动程序无法知道已提交的状态,因此无法安全地请求重试。在这种情况下,操作员将不得不执行
retry
或undo
。
驱动程序在失败时不会重试自动提交查询,因为它不知道在失败时已提交了什么状态。 |
与其他类型的 Cypher 查询不同, 有关更多详细信息,请参阅以下内容 |
public void AddPerson(string name)
{
using var session = Driver.Session();
session.Run("CREATE (a:Person {name: $name})", new { name });
}
消费结果
查询结果通常作为记录流进行消费。驱动程序提供了迭代该流的方法。
public List<string> GetPeople()
{
using var session = Driver.Session();
return session.ExecuteRead(
tx =>
{
var result = tx.Run("MATCH (a:Person) RETURN a.name ORDER BY a.name");
return result.Select(record => record[0].As<string>()).ToList();
});
}
保留结果
在一个会话中,一次只能有一个结果流处于活动状态。因此,如果一个查询的结果在另一个查询执行之前未完全消费,则第一个结果的其余部分将自动缓冲在结果对象中。
此缓冲区为结果提供了暂存点,并将结果处理分为获取(从网络移动到缓冲区)和消费(从缓冲区移动到应用程序)。
对于大量结果,结果缓冲区可能需要大量内存。 因此,建议尽可能按顺序消费结果。 |
客户端应用程序可以选择通过显式保留结果来控制更高级的查询模式。当需要保存结果以供将来处理时,这种显式保留也可能很有用。驱动程序为此过程提供支持,如下例所示
public int AddEmployees(string companyName)
{
using var session = Driver.Session();
var persons = session.ExecuteRead(tx => tx.Run("MATCH (a:Person) RETURN a.name AS name").ToList());
return persons.Sum(
person => session.ExecuteWrite(
tx =>
{
var result = tx.Run(
"MATCH (emp:Person {name: $person_name}) " +
"MERGE (com:Company {name: $company_name}) " +
"MERGE (emp)-[:WORKS_FOR]->(com)",
new { person_name = person["name"].As<string>(), company_name = companyName });
result.Consume();
return 1;
}));
}
异步会话
异步会话提供了一个 API,其中函数调用通常返回可用的对象,例如 Future。这允许客户端应用程序在异步框架内工作并利用协作式多任务处理。
生命周期
会话生命周期从会话构造开始。然后会话一直存在直到其关闭,这通常设置在其包含的查询结果已被消费之后发生。
会话可以通过多种不同方式配置。这通过在会话构造函数中提供配置来实现。
有关更多详细信息,请参阅会话配置。
事务函数
事务函数是包含事务性工作单元的推荐形式。这种事务形式需要最少的样板代码,并允许清晰地分离数据库查询和应用程序逻辑。事务函数也很受欢迎,因为它们封装了重试逻辑,并在将单个服务器实例替换为集群时提供了最大程度的灵活性。
函数可以作为读操作或写操作调用。此选择会将事务路由到集群环境中适当的服务器。如果您在单实例环境中,此路由没有影响,但如果您稍后选择采用集群环境,它会提供灵活性。
在编写事务函数之前,重要的是要确保事务函数执行的任何副作用都应设计为幂等。这是因为如果初始运行失败,函数可能会被执行多次。
在事务函数中获得的任何查询结果都应在该函数内部使用,因为连接绑定的资源在超出范围时无法正确管理。为此,事务函数可以返回值,但这些值应该是派生值而不是原始结果。
当事务失败时,将调用驱动程序的重试逻辑。对于几种失败情况,事务可以立即在不同的服务器上重试。 这些情况包括连接问题、服务器角色更改(例如领导者选举)和瞬态错误。重试逻辑可以在创建会话时配置。 |
ExecuteReadAsync 和 ExecuteWriteAsync 方法已取代 ReadTransactionAsync 和 WriteTransactionAsync ,后者在 5.x 版本中已弃用,并将在 6.0 版本中移除。 |
public async Task<List<string>> PrintAllProducts()
{
await using var session = Driver.AsyncSession();
// Wrap whole operation into an managed transaction and
// get the results back.
return await session.ExecuteReadAsync(
async tx =>
{
var products = new List<string>();
// Send cypher query to the database
var reader = await tx.RunAsync(
"MATCH (p:Product) WHERE p.id = $id RETURN p.title", // Cypher query
new { id = 0 } // Parameters in the query, if any
);
// Loop through the records asynchronously
while (await reader.FetchAsync())
// Each current read in buffer can be reached via Current
{
products.Add(reader.Current[0].ToString());
}
return products;
});
}
自动提交事务(或隐式事务)
自动提交事务或隐式事务是一种基本但有限的事务形式。此类事务仅包含一个 Cypher 查询,并且在失败时不会自动重试。因此,任何错误场景都需要由客户端应用程序自行处理。
自动提交事务有两个用途
-
简单用例,例如学习 Cypher 或编写一次性脚本时。
-
诸如批量数据加载操作之类的操作,其中驱动程序无法知道已提交的状态,因此无法安全地请求重试。在这种情况下,操作员将不得不执行
retry
或undo
。
驱动程序在失败时不会重试自动提交查询,因为它不知道在失败时已提交了什么状态。 |
与其他类型的 Cypher 查询不同, 有关更多详细信息,请参阅以下内容 |
public async Task<List<string>> ReadProductTitles()
{
var records = new List<string>();
await using var session = Driver.AsyncSession();
// Send cypher query to the database.
// The existing IResult interface implements IEnumerable
// and does not play well with asynchronous use cases. The replacement
// IResultCursor interface is returned from the RunAsync
// family of methods instead and provides async capable methods.
var reader = await session.RunAsync(
"MATCH (p:Product) WHERE p.id = $id RETURN p.title", // Cypher query
new { id = 0 } // Parameters in the query, if any
);
// Loop through the records asynchronously
while (await reader.FetchAsync())
// Each current read in buffer can be reached via Current
{
records.Add(reader.Current[0].ToString());
}
return records;
}
组合事务
驱动程序提供了一种语言惯用的方式,可以在单个异步会话中组合多个事务。
public async Task<int?> EmployEveryoneInCompany(string companyName)
{
await using var session = Driver.AsyncSession();
var names = await session.ExecuteReadAsync(
async tx =>
{
var cursor = await tx.RunAsync("MATCH (a:Person) RETURN a.name AS name");
var people = await cursor.ToListAsync();
return people.Select(person => person["name"].As<string>());
});
return await session.ExecuteWriteAsync(
async tx =>
{
var relationshipsCreated = new List<int>();
foreach (var personName in names)
{
var cursor = await tx.RunAsync(
"MATCH (emp:Person {name: $person_name}) " +
"MERGE (com:Company {name: $company_name}) " +
"MERGE (emp)-[:WORKS_FOR]->(com)",
new
{
person_name = personName,
company_name = companyName
});
var summary = await cursor.ConsumeAsync();
relationshipsCreated.Add(summary.Counters.RelationshipsCreated);
}
return relationshipsCreated.Sum();
});
}
消费结果
异步会话 API 提供了语言惯用的方法,以帮助与异步应用程序和框架集成。
public async Task<List<string>> GetPeopleAsync()
{
await using var session = Driver.AsyncSession();
return await session.ExecuteReadAsync(
async tx =>
{
var result = await tx.RunAsync("MATCH (a:Person) RETURN a.name ORDER BY a.name");
return await result.ToListAsync(r => r[0].As<string>());
});
}
反应式会话
从 Neo4j 4.0 开始,支持查询的反应式处理。这可以通过反应式会话实现。反应式会话允许动态管理驱动程序和服务器之间交换的数据。
与反应式编程典型地,消费者控制他们从查询中消费记录的速度,而驱动程序反过来管理从服务器请求记录的速度。整个 Neo4j 堆栈都支持流控制,这意味着查询引擎正确响应流控制信号。这导致资源处理效率更高,并确保接收方不会被迫缓冲任意数量的数据。
有关反应式流的更多信息,请参阅以下内容
反应式会话通常用于已经面向反应式风格的客户端应用程序中;预计已存在反应式依赖项或框架。 有关推荐依赖项的更多信息,请参阅开始使用。 |
事务函数
这种事务形式需要最少的样板代码,并允许清晰地分离数据库查询和应用程序逻辑。事务函数也很受欢迎,因为它们封装了重试逻辑,并在将单个服务器实例替换为集群时提供了最大程度的灵活性。
函数可以作为读操作或写操作调用。此选择会将事务路由到集群环境中适当的服务器。如果您在单实例环境中,此路由没有影响,但如果您稍后选择采用集群环境,它会提供灵活性。
在编写事务函数之前,重要的是要确保事务函数执行的任何副作用都应设计为幂等。这是因为如果初始运行失败,函数可能会被执行多次。
在事务函数中获得的任何查询结果都应在该函数内部使用,因为连接绑定的资源在超出范围时无法正确管理。为此,事务函数可以返回值,但这些值应该是派生值而不是原始结果。
当事务失败时,将调用驱动程序的重试逻辑。对于几种失败情况,事务可以立即在不同的服务器上重试。这些情况包括连接问题、服务器角色更改(例如领导者选举)和瞬态错误。重试逻辑可以在创建会话时配置。 |
ExecuteRead 和 ExecuteWrite 方法已取代 ReadTransaction 和 WriteTransaction ,后者在 5.x 版本中已弃用,并将在 6.0 版本中移除。 |
public IObservable<string> PrintAllProducts()
{
var session = Driver.RxSession();
return session.ExecuteRead(
tx =>
{
return tx.Run(
"MATCH (p:Product) WHERE p.id = $id RETURN p.title", // Cypher query
new { id = 0 } // Parameters in the query, if any
)
.Records()
.Select(record => record[0].ToString());
})
.OnErrorResumeNext(session.Close<string>());
}
会话可以通过多种不同方式配置。这通过在会话构造函数中提供配置来实现。有关更多详细信息,请参阅会话配置。
自动提交事务(或隐式事务)
自动提交事务或隐式事务是一种基本但有限的事务形式。此类事务仅包含一个 Cypher 查询,并且在失败时不会自动重试。因此,任何错误场景都需要由客户端应用程序自行处理。
自动提交事务有两个用途
-
简单用例,例如学习 Cypher 或编写一次性脚本时。
-
诸如批量数据加载操作之类的操作,其中驱动程序无法知道已提交的状态,因此无法安全地请求重试。在这种情况下,操作员将不得不执行
retry
或undo
。
驱动程序在失败时不会重试自动提交查询,因为它不知道在失败时已提交了什么状态。 |
与其他类型的 Cypher 查询不同, 有关更多详细信息,请参阅以下内容 |
public IObservable<string> ReadProductTitles()
{
var session = Driver.RxSession();
return session.Run(
"MATCH (p:Product) WHERE p.id = $id RETURN p.title", // Cypher query
new { id = 0 } // Parameters in the query, if any
)
.Records()
.Select(record => record[0].ToString())
.OnErrorResumeNext(session.Close<string>());
}
消费结果
要在反应式会话中从查询消费数据,需要订阅者来处理发布者返回的结果。
每个事务对应一个从服务器提供数据的数据流。当从该流中拉取记录时,结果处理开始。每个数据流只能有一个订阅者拉取数据。
public IObservable<string> GetPeople()
{
var session = Driver.RxSession();
return session.ExecuteRead(
tx =>
{
return tx.Run("MATCH (a:Person) RETURN a.name ORDER BY a.name")
.Records()
.Select(record => record[0].As<string>());
})
.OnErrorResumeNext(session.Close<string>());
}
会话配置
- 书签
-
确保会话内事务之间因果一致性的机制。书签在单个会话的事务之间隐式传递,以满足因果一致性要求。在某些情况下,您可能希望在一个会话中使用书签到另一个新会话中。
默认值:无(会话最初将不带书签创建)
- 默认访问模式
-
未D使用事务函数时访问模式设置的备用方案。通常,访问模式通过调用适当的事务函数方法按事务设置。在其他情况下,此设置是继承的。请注意,事务函数将忽略/覆盖此设置。
默认值:写入
- 数据库
-
会话将与之交互的数据库。当使用非默认数据库(即
system
数据库或 Neo4j 4.0 企业版中的其他数据库)时,您可以明确配置驱动程序执行事务的数据库。数据库别名的解析发生在连接创建时,这不受会话控制。因此,不建议在存在活动会话时更改数据库别名。每个用户的主数据库在会话创建和首次模拟用户时解析。因此,需要创建一个新会话来反映更改后的主数据库。
有关数据库的更多信息,请参阅操作手册 → 默认数据库。
有关模拟的更多信息,请参阅下文和Cypher 手册 →
DBMS IMPERSONATE
权限。默认值:服务器上配置的默认数据库。
查看有关数据库选择的更多信息。
获取大小
-
每次从服务器获取的记录批次数量。Neo4j 4.0 引入了批量拉取记录的能力,允许客户端应用程序控制数据填充并对服务器施加背压。此
FetchSize
适用于简单会话和异步会话,而反应式会话可以直接使用订阅的请求方法进行控制。默认值:1000 条记录
- 模拟用户
-
如果用户已被授予明确权限,他们可以以不同用户的身份对数据库运行事务。模拟用户时,查询将在被模拟用户的完整安全上下文(即主数据库、权限等)中运行,而不是已认证用户。
有关每个用户的默认数据库的更多信息,请参阅操作手册 → 每个用户的主数据库。
默认值:无(会话将使用已登录用户创建)
事务配置
可以为执行的事务提供额外的配置。
事务超时
可以提供超时值,执行时间超过此值的事务将在服务器上终止。此值将覆盖 dbms.transaction.timeout
设置的值。如果未提供值,则采用服务器设置的默认值(请参阅操作手册 → 事务管理)。
注意:在 Neo4j 4.2 到 5.2 版本中,dbms.transaction.timeout
作为驱动程序无法覆盖的最大值。例如,如果服务器设置为 dbms.transaction.timeout=10s
,驱动程序可以指定更短的超时(例如 5 秒),但不能超过 10 秒。如果提供更大的值,事务仍将在 10 秒后超时。在 Neo4j 5.3 及更高版本中,您可以将事务超时设置为任意值,并且驱动程序可以覆盖使用 dbms.transaction.timeout
设置的任何值。
默认值:在服务器上通过 dbms.transaction.timeout
配置。
public void AddPerson(string name)
{
using var session = Driver.Session();
session.ExecuteWrite(
tx => tx.Run("CREATE (a:Person {name: $name})", new { name }).Consume(),
txConfig => txConfig.WithTimeout(TimeSpan.FromSeconds(5)));
}
元数据
事务可以被标记元数据,这些元数据将附加到正在执行的事务上,并在列出事务和查询的各种输出中可见,以及出现在查询日志中。
默认值:无。
public void AddPerson(string name)
{
using var session = Driver.Session();
var txMetadata = new Dictionary<string, object> { { "applicationId", "123" } };
session.ExecuteWrite(
tx => tx.Run("CREATE (a:Person {name: $name})", new { name }).Consume(),
txConfig => txConfig.WithMetadata(txMetadata));
}