会话 API
本节详细介绍了驱动程序提供的会话 API。
简单会话
简单会话为 Cypher 执行提供了一个“经典”的**阻塞式** API。通常,简单会话提供了最简单的编程风格,因为 API 调用以严格的顺序方式执行。
生命周期
会话生命周期从会话构造到会话关闭。在支持它们的语言中,简单会话通常在上下文块内定义范围;这确保了它们被正确关闭,并且任何底层连接都被释放并且不会泄漏。
using (var session = driver.Session(...)) {
// transactions go here
}
**会话可以通过多种不同的方式进行配置**。这通过在会话构造函数中提供配置来实现。有关更多详细信息,请参阅会话配置。
事务函数
事务函数用于包含事务性工作单元。这种形式的事务需要最少的样板代码,并允许数据库查询和应用程序逻辑之间进行清晰的分离。
事务函数也很理想,因为它们封装了重试逻辑,并在将服务器的单个实例替换为集群时提供了最大的灵活性。
事务函数可以作为读取或写入操作调用。此选择会将事务路由到集群环境中的适当服务器。如果您在单实例环境中操作,则此路由没有影响。如果您以后选择采用集群环境,它确实为您提供了灵活性。
在编写事务函数之前,**务必确保它设计为幂等的**。这是因为如果初始运行失败,函数可能会执行多次。
在事务函数中获得的任何查询结果都应在该函数内使用,因为当超出范围时,无法正确管理连接绑定的资源。为此,事务函数可以返回值,但这些值应该是派生值而不是原始结果。
事务函数是包含事务性工作单元的推荐形式。. **当事务失败时,将调用驱动程序重试逻辑**。对于某些故障情况,可以立即针对不同的服务器重试事务。这些情况包括连接问题、服务器角色更改(例如领导者选举)和瞬态错误。 |
方法ExecuteRead 和ExecuteWrite 已替换ReadTransaction 和WriteTransaction ,后者在 5.x 版本中已弃用,将在 6.0 版本中删除。 |
public void AddPerson(string name)
{
using var session = Driver.Session();
session.ExecuteWrite(tx => tx.Run("CREATE (a:Person {name: $name})", new { name }).Consume());
}
自动提交事务(或隐式事务)
自动提交事务或隐式事务是一种基本但有限的事务形式。此类事务仅包含一个 Cypher 查询,并且在失败时不会自动重试。因此,任何错误场景都需要由客户端应用程序本身处理。
自动提交事务用于以下目的
-
简单的用例,例如学习 Cypher 或编写一次性脚本时。
-
操作,例如批量数据加载操作,其中驱动程序无法了解已提交的状态,因此无法安全地请求重试。在这种情况下,操作员必须执行
重试
或撤消
。
驱动程序在失败时不会重试自动提交查询,因为它不知道在失败时已提交什么状态。 |
与其他类型的 Cypher 查询不同, 有关更多详细信息,请参阅以下内容 |
public void AddPerson(string name)
{
using var session = Driver.Session();
session.Run("CREATE (a:Person {name: $name})", new { name });
}
使用结果
查询结果通常作为记录流使用。驱动程序提供了一种遍历该流的方法。
public List<string> GetPeople()
{
using var session = Driver.Session();
return session.ExecuteRead(
tx =>
{
var result = tx.Run("MATCH (a:Person) RETURN a.name ORDER BY a.name");
return result.Select(record => record[0].As<string>()).ToList();
});
}
保留结果
在会话中,**一次只能激活一个结果流**。因此,如果在一个查询的结果在执行另一个查询之前没有完全使用,则第一个结果的其余部分将自动缓存在结果对象中。
此缓冲区为结果提供了一个暂存点,并将结果处理分为**获取**(从网络移动到缓冲区)和**使用**(从缓冲区移动到应用程序)。
对于大型结果,结果缓冲区可能需要大量的内存。 因此,**建议尽可能按顺序使用结果**。 |
客户端应用程序可以通过显式保留结果来选择控制更高级的查询模式。当需要保存结果以供将来处理时,此类显式保留也可能很有用。驱动程序根据以下示例提供了对此过程的支持
public int AddEmployees(string companyName)
{
using var session = Driver.Session();
var persons = session.ExecuteRead(tx => tx.Run("MATCH (a:Person) RETURN a.name AS name").ToList());
return persons.Sum(
person => session.ExecuteWrite(
tx =>
{
var result = tx.Run(
"MATCH (emp:Person {name: $person_name}) " +
"MERGE (com:Company {name: $company_name}) " +
"MERGE (emp)-[:WORKS_FOR]->(com)",
new { person_name = person["name"].As<string>(), company_name = companyName });
result.Consume();
return 1;
}));
}
异步会话
异步会话提供了一个 API,其中函数调用通常返回可用的对象,例如期货。这允许客户端应用程序在异步框架内工作并利用协作式多任务处理。
生命周期
会话生命周期从会话构造开始。然后,会话一直存在,直到它被关闭,这通常设置为在其包含的查询结果被使用后发生。
会话可以通过多种不同的方式进行配置。这通过在会话构造函数中提供配置来实现。
有关更多详细信息,请参阅会话配置。
事务函数
事务函数是**包含事务性工作单元的推荐形式**。这种形式的事务需要最少的样板代码,并允许数据库查询和应用程序逻辑之间进行清晰的分离。事务函数也很理想,因为它们封装了重试逻辑,并在将服务器的单个实例替换为集群时提供了最大的灵活性。
函数可以作为读取或写入操作调用。此选择会将事务路由到集群环境中的适当服务器。如果您在单实例环境中,则此路由没有影响,但如果您以后选择采用集群环境,它确实为您提供了灵活性。
在编写事务函数之前,务必确保事务函数执行的任何副作用都设计为幂等的。这是因为如果初始运行失败,函数可能会执行多次。
在事务函数中获得的任何查询结果都应在该函数内使用,因为当超出范围时,无法正确管理连接绑定的资源。为此,事务函数可以返回值,但这些值应该是派生值而不是原始结果。
当事务失败时,将调用驱动程序重试逻辑。对于某些故障情况,可以立即针对不同的服务器重试事务。 这些情况包括连接问题、服务器角色更改(例如领导者选举)和瞬态错误。在创建会话时可以配置重试逻辑。 |
方法ExecuteReadAsync 和ExecuteWriteAsync 已替换ReadTransactionAsync 和WriteTransactionAsync ,后者在 5.x 版本中已弃用,将在 6.0 版本中删除。 |
public async Task<List<string>> PrintAllProducts()
{
await using var session = Driver.AsyncSession();
// Wrap whole operation into an managed transaction and
// get the results back.
return await session.ExecuteReadAsync(
async tx =>
{
var products = new List<string>();
// Send cypher query to the database
var reader = await tx.RunAsync(
"MATCH (p:Product) WHERE p.id = $id RETURN p.title", // Cypher query
new { id = 0 } // Parameters in the query, if any
);
// Loop through the records asynchronously
while (await reader.FetchAsync())
// Each current read in buffer can be reached via Current
{
products.Add(reader.Current[0].ToString());
}
return products;
});
}
自动提交事务(或隐式事务)
自动提交事务或隐式事务是一种基本但有限的事务形式。此类事务仅包含一个 Cypher 查询,并且在失败时不会自动重试。因此,任何错误场景都需要由客户端应用程序本身处理。
自动提交事务用于两个目的
-
简单的用例,例如学习 Cypher 或编写一次性脚本时。
-
操作,例如批量数据加载操作,其中驱动程序无法了解已提交的状态,因此无法安全地请求重试。在这种情况下,操作员必须执行
重试
或撤消
。
驱动程序在失败时不会重试自动提交查询,因为它不知道在失败时已提交什么状态。 |
与其他类型的 Cypher 查询不同, 有关更多详细信息,请参阅以下内容 |
public async Task<List<string>> ReadProductTitles()
{
var records = new List<string>();
await using var session = Driver.AsyncSession();
// Send cypher query to the database.
// The existing IResult interface implements IEnumerable
// and does not play well with asynchronous use cases. The replacement
// IResultCursor interface is returned from the RunAsync
// family of methods instead and provides async capable methods.
var reader = await session.RunAsync(
"MATCH (p:Product) WHERE p.id = $id RETURN p.title", // Cypher query
new { id = 0 } // Parameters in the query, if any
);
// Loop through the records asynchronously
while (await reader.FetchAsync())
// Each current read in buffer can be reached via Current
{
records.Add(reader.Current[0].ToString());
}
return records;
}
组合事务
驱动程序提供了一种语言惯用的方式来在一个异步会话中组合多个事务。
public async Task<int?> EmployEveryoneInCompany(string companyName)
{
await using var session = Driver.AsyncSession();
var names = await session.ExecuteReadAsync(
async tx =>
{
var cursor = await tx.RunAsync("MATCH (a:Person) RETURN a.name AS name");
var people = await cursor.ToListAsync();
return people.Select(person => person["name"].As<string>());
});
return await session.ExecuteWriteAsync(
async tx =>
{
var relationshipsCreated = new List<int>();
foreach (var personName in names)
{
var cursor = await tx.RunAsync(
"MATCH (emp:Person {name: $person_name}) " +
"MERGE (com:Company {name: $company_name}) " +
"MERGE (emp)-[:WORKS_FOR]->(com)",
new
{
person_name = personName,
company_name = companyName
});
var summary = await cursor.ConsumeAsync();
relationshipsCreated.Add(summary.Counters.RelationshipsCreated);
}
return relationshipsCreated.Sum();
});
}
使用结果
异步会话 API 提供了语言惯用的方法来帮助与异步应用程序和框架集成。
public async Task<List<string>> GetPeopleAsync()
{
await using var session = Driver.AsyncSession();
return await session.ExecuteReadAsync(
async tx =>
{
var result = await tx.RunAsync("MATCH (a:Person) RETURN a.name ORDER BY a.name");
return await result.ToListAsync(r => r[0].As<string>());
});
}
反应式会话
从 **Neo4j 4.0** 开始,支持查询的反应式处理。这可以通过反应式会话来实现。反应式会话允许动态管理在驱动程序和服务器之间交换的数据。
反应式编程的典型特征是,使用者控制从查询中使用记录的速率,而驱动程序则管理从服务器请求记录的速率。在整个 Neo4j 堆栈中都支持流控制,这意味着查询引擎会对流控制信号做出正确的响应。这使得资源处理更加高效,并确保接收方不会被迫缓冲任意数量的数据。
有关反应式流的更多信息,请参阅以下内容:
反应式会话通常用于已经面向反应式风格的客户端应用程序;预计已经存在反应式依赖项或框架。 有关推荐依赖项的更多信息,请参阅 入门。 |
事务函数
这种形式的事务需要最少的样板代码,并允许清晰地分离数据库查询和应用程序逻辑。事务函数也很理想,因为它们封装了重试逻辑,并在将单个服务器实例替换为集群时提供了最大程度的灵活性。
函数可以作为读取或写入操作调用。此选择会将事务路由到集群环境中的适当服务器。如果您在单实例环境中,则此路由没有影响,但如果您以后选择采用集群环境,它确实为您提供了灵活性。
在编写事务函数之前,务必确保事务函数执行的任何副作用都设计为幂等的。这是因为如果初始运行失败,函数可能会执行多次。
在事务函数中获得的任何查询结果都应在该函数内使用,因为当超出范围时,无法正确管理连接绑定的资源。为此,事务函数可以返回值,但这些值应该是派生值而不是原始结果。
当事务失败时,将调用驱动程序重试逻辑。对于某些故障情况,可以立即对不同的服务器重试事务。这些情况包括连接问题、服务器角色更改(例如领导者选举)和瞬态错误。在创建会话时可以配置重试逻辑。 |
方法ExecuteRead 和ExecuteWrite 已替换ReadTransaction 和WriteTransaction ,后者在 5.x 版本中已弃用,将在 6.0 版本中删除。 |
public IObservable<string> PrintAllProducts()
{
var session = Driver.RxSession();
return session.ExecuteRead(
tx =>
{
return tx.Run(
"MATCH (p:Product) WHERE p.id = $id RETURN p.title", // Cypher query
new { id = 0 } // Parameters in the query, if any
)
.Records()
.Select(record => record[0].ToString());
})
.OnErrorResumeNext(session.Close<string>());
}
会话可以通过多种不同的方式进行配置。这是通过在会话构造函数中提供配置来执行的。有关更多详细信息,请参阅 会话配置。
自动提交事务(或隐式事务)
自动提交事务或隐式事务是一种基本但有限的事务形式。此类事务仅包含一个 Cypher 查询,并且在失败时不会自动重试。因此,任何错误场景都需要由客户端应用程序本身处理。
自动提交事务用于两个目的
-
简单的用例,例如学习 Cypher 或编写一次性脚本时。
-
操作,例如批量数据加载操作,其中驱动程序无法了解已提交的状态,因此无法安全地请求重试。在这种情况下,操作员必须执行
重试
或撤消
。
驱动程序在失败时不会重试自动提交查询,因为它不知道在失败时已提交什么状态。 |
与其他类型的 Cypher 查询不同, 有关更多详细信息,请参阅以下内容 |
public IObservable<string> ReadProductTitles()
{
var session = Driver.RxSession();
return session.Run(
"MATCH (p:Product) WHERE p.id = $id RETURN p.title", // Cypher query
new { id = 0 } // Parameters in the query, if any
)
.Records()
.Select(record => record[0].ToString())
.OnErrorResumeNext(session.Close<string>());
}
使用结果
要在反应式会话中使用查询中的数据,需要一个订阅者来处理发布者返回的结果。
每个事务对应一个数据流,该数据流提供来自服务器的数据。当从此流中提取记录时,结果处理开始。只有一个订阅者可以从给定的流中提取数据。
public IObservable<string> GetPeople()
{
var session = Driver.RxSession();
return session.ExecuteRead(
tx =>
{
return tx.Run("MATCH (a:Person) RETURN a.name ORDER BY a.name")
.Records()
.Select(record => record[0].As<string>());
})
.OnErrorResumeNext(session.Close<string>());
}
会话配置
- 书签
-
确保会话内事务之间因果一致性的机制。书签在单个会话内的事务之间隐式传递,以满足因果一致性要求。在某些情况下,您可能希望在一个会话中使用书签在另一个新的会话中使用。
**默认值:**无(会话最初将创建时不带书签)
- 默认访问模式
-
当不使用事务函数时,访问模式设置的回退。通常,通过调用相应的事务函数方法为每个事务设置访问模式。在其他情况下,此设置将被继承。请注意,事务函数将忽略/覆盖此设置。
**默认值:**写入
- 数据库
-
会话将与其交互的数据库。当使用非默认数据库(即
system
数据库或 Neo4j 4.0 Enterprise Edition 中的另一个数据库)时,您可以显式配置驱动程序正在针对其执行事务的数据库。数据库别名的解析发生在连接创建时,不受会话控制。因此,不建议在存在活动会话时更改数据库别名。每个用户的 home 数据库在会话创建时以及首次模拟用户时解析。因此,需要创建新的会话以反映已更改的 home 数据库。
有关数据库的更多信息,请参阅 操作手册 → 默认数据库。
有关模拟的更多信息,请参阅以下内容和 Cypher 手册 →
DBMS IMPERSONATE
权限。**默认值:**服务器上配置的默认数据库。
有关 数据库选择 的更多信息,请参阅。
获取大小
-
每次从服务器获取的批次中的记录数。Neo4j 4.0 引入了分批提取记录的功能,允许客户端应用程序控制数据填充并对服务器施加反压。此
FetchSize
适用于 简单会话 和 异步会话,而反应式会话可以使用订阅的请求方法直接控制。**默认值:**1000 条记录
- 模拟用户
-
如果用户被授予执行此操作的显式权限,则用户可以以不同用户的身份对数据库运行事务。模拟用户时,查询在模拟用户的完整安全上下文中运行,而不是在经过身份验证的用户(即 home 数据库、权限等)的上下文中运行。
有关每个用户的默认数据库的更多信息,请参阅 操作手册 → 每个用户的 home 数据库。
**默认值:**无(会话将使用已登录的用户创建)
事务配置
可以为执行的事务提供其他配置。
事务超时
可以提供超时值,并且在服务器上执行时间超过此值的事务将被终止。此值将覆盖由 dbms.transaction.timeout
设置的值。如果未提供值,则由服务器设置采用默认值(请参阅 操作手册 → 事务管理)。
注意:在 Neo4j 4.2 到 5.2 版本的上下文中,dbms.transaction.timeout
充当驱动程序无法覆盖的最大值。例如,如果服务器设置为 dbms.transaction.timeout=10s
,则驱动程序可以指定较短的超时时间(例如 5 秒),但不能指定大于 10 秒的值。如果提供更大的值,事务仍将在 10 秒后超时。在 Neo4j 5.3 及更高版本中,您可以将事务超时设置为所需的任何值,并且驱动程序可以覆盖使用 dbms.transaction.timeout
设置的任何值。
**默认值:**使用 dbms.transaction.timeout
在服务器上配置。
public void AddPerson(string name)
{
using var session = Driver.Session();
session.ExecuteWrite(
tx => tx.Run("CREATE (a:Person {name: $name})", new { name }).Consume(),
txConfig => txConfig.WithTimeout(TimeSpan.FromSeconds(5)));
}
元数据
事务可以使用元数据进行标记,这些元数据将附加到正在执行的事务,并在列出事务和查询以及出现在查询日志中的各种输出中可见。
**默认值:**无。
public void AddPerson(string name)
{
using var session = Driver.Session();
var txMetadata = new Dictionary<string, object> { { "applicationId", "123" } };
session.ExecuteWrite(
tx => tx.Run("CREATE (a:Person {name: $name})", new { name }).Consume(),
txConfig => txConfig.WithMetadata(txMetadata));
}