会话 API
简单会话
适用于
|
简单会话为 Cypher 执行提供了一个“经典”的**阻塞式** API。通常,简单会话提供了最简单的编程风格,因为 API 调用以严格的顺序方式执行。
生命周期
会话生命周期从会话构建到会话关闭。在支持它们的语言中,简单会话通常在上下文块内进行范围限定;这确保它们被正确关闭,并且任何底层连接都被释放并且不会泄漏。
using (var session = driver.Session(...)) {
// transactions go here
}
session := driver.NewSession(...)
defer session.Close()
// transactions go here
try (Session session = driver.session(...)) {
// transactions go here
}
with driver.session(...) as session:
// transactions go here
**会话可以通过多种不同的方式进行配置**。这是通过在会话构造函数中提供配置来实现的。有关更多详细信息,请参阅会话配置。
事务函数
事务函数用于包含事务工作单元。这种形式的事务需要最少的样板代码,并允许数据库查询和应用程序逻辑之间进行清晰的分离。
事务函数也很理想,因为它们封装了重试逻辑,并在将服务器的单个实例替换为集群时提供了最大的灵活性。
事务函数可以作为读操作或写操作调用。此选择会将事务路由到集群环境中的适当服务器。如果您在单实例环境中操作,则此路由没有影响。如果您以后选择采用集群环境,它确实为您提供了灵活性。
在编写事务函数之前,**务必确保其设计为幂等的**。这是因为如果初始运行失败,该函数可能会执行多次。
在事务函数中获得的任何查询结果都应在该函数内使用,因为当超出范围时,无法正确管理与连接相关的资源。为此,事务函数可以返回值,但这些值应该是派生值而不是原始结果。
事务函数是包含事务工作单元的推荐形式. **当事务失败时,将调用驱动程序重试逻辑**。对于一些故障情况,可以立即对不同的服务器重试事务。这些情况包括连接问题、服务器角色更改(例如领导者选举)和瞬态错误。 |
public void AddPerson(string name)
{
using (var session = Driver.Session())
{
session.WriteTransaction(tx => tx.Run("CREATE (a:Person {name: $name})", new {name}));
}
}
func addPersonInTxFunc(driver neo4j.Driver, name string) error {
session := driver.NewSession(neo4j.SessionConfig{AccessMode: neo4j.AccessModeWrite})
defer session.Close()
_, err := session.WriteTransaction(func(tx neo4j.Transaction) (interface{}, error) {
result, err := tx.Run("CREATE (a:Person {name: $name})", map[string]interface{}{"name": name})
if err != nil {
return nil, err
}
return result.Consume()
})
return err
}
public void addPerson( final String name )
{
try ( Session session = driver.session() )
{
session.writeTransaction( tx -> {
tx.run( "CREATE (a:Person {name: $name})", parameters( "name", name ) );
return 1;
} );
}
}
from neo4j import unit_of_work
@unit_of_work(timeout=5)
def create_person(tx, name):
return tx.run("CREATE (a:Person {name: $name}) RETURN id(a)", name=name).single().value()
def add_person(driver, name):
with driver.session() as session:
return session.write_transaction(create_person, name)
自动提交事务
**自动提交事务**是**一种基本但有限的事务形式**。此类事务仅包含一个 Cypher 查询,并且不会在失败时自动重放。因此,任何错误场景都必须由客户端应用程序本身处理。
自动提交事务**旨在用于简单用例**,例如学习**Cypher**或编写一次性脚本时。
不建议在生产环境中使用自动提交事务。 |
与其他类型的 Cypher 查询不同, 因此,从驱动程序执行 |
public void AddPerson(string name)
{
using (var session = Driver.Session())
{
session.Run("CREATE (a:Person {name: $name})", new {name});
}
}
func addPersonInAutoCommitTx(driver neo4j.Driver, name string) error {
session := driver.NewSession(neo4j.SessionConfig{AccessMode: neo4j.AccessModeWrite})
defer session.Close()
result, err := session.Run("CREATE (a:Person {name: $name})", map[string]interface{}{"name": name})
if err != nil {
return err
}
if _, err = result.Consume(); err != nil {
return err
}
return nil
}
public void addPerson( String name )
{
try ( Session session = driver.session() )
{
session.run( "CREATE (a:Person {name: $name})", parameters( "name", name ) );
}
}
from neo4j import Query
def add_person(self, name):
with self.driver.session() as session:
session.run("CREATE (a:Person {name: $name})", name=name)
# Alternative implementation, with a one second timeout
def add_person_within_a_second(self, name):
with self.driver.session() as session:
session.run(Query("CREATE (a:Person {name: $name})", timeout=1.0), name=name)
使用结果
查询结果通常作为记录流来使用。驱动程序提供了以语言惯用方式迭代该流的方法。
public List<string> GetPeople()
{
using (var session = Driver.Session())
{
return session.ReadTransaction(tx =>
{
var result = tx.Run("MATCH (a:Person) RETURN a.name ORDER BY a.name");
return result.Select(record => record[0].As<string>()).ToList();
});
}
}
func getPeople(driver neo4j.Driver) ([]string, error) {
session := driver.NewSession(neo4j.SessionConfig{AccessMode: neo4j.AccessModeRead})
defer session.Close()
people, err := session.ReadTransaction(func(tx neo4j.Transaction) (interface{}, error) {
var list []string
result, err := tx.Run("MATCH (a:Person) RETURN a.name ORDER BY a.name", nil)
if err != nil {
return nil, err
}
for result.Next() {
list = append(list, result.Record().Values[0].(string))
}
if err = result.Err(); err != nil {
return nil, err
}
return list, nil
})
if err != nil {
return nil, err
}
return people.([]string), nil
}
public List<String> getPeople()
{
try ( Session session = driver.session() )
{
return session.readTransaction( tx -> {
List<String> names = new ArrayList<>();
Result result = tx.run( "MATCH (a:Person) RETURN a.name ORDER BY a.name" );
while ( result.hasNext() )
{
names.add( result.next().get( 0 ).asString() );
}
return names;
} );
}
}
def match_person_nodes(tx):
result = tx.run("MATCH (a:Person) RETURN a.name ORDER BY a.name")
return [record["a.name"] for record in result]
with driver.session() as session:
people = session.read_transaction(match_person_nodes)
保留结果
在一个会话中,**任何时候只能有一个结果流处于活动状态**。因此,如果在执行另一个查询之前未完全使用第一个查询的结果,则第一个结果的其余部分将自动缓冲在结果对象中。
此缓冲区为结果提供了一个暂存点,并将结果处理划分为**获取**(从网络移动到缓冲区)和**使用**(从缓冲区移动到应用程序)。
对于大型结果,结果缓冲区可能需要大量的内存。 因此,**建议尽可能按顺序使用结果**。 |
客户端应用程序可以通过显式保留结果来选择控制更高级别的查询模式。当需要保存结果以供将来处理时,这种显式保留可能也很有用。驱动程序支持此过程,如下面的示例所示
public int AddEmployees(string companyName)
{
using (var session = Driver.Session())
{
var persons =
session.ReadTransaction(tx => tx.Run("MATCH (a:Person) RETURN a.name AS name").ToList());
return persons.Sum(person => session.WriteTransaction(tx =>
{
tx.Run("MATCH (emp:Person {name: $person_name}) " +
"MERGE (com:Company {name: $company_name}) " +
"MERGE (emp)-[:WORKS_FOR]->(com)",
new {person_name = person["name"].As<string>(), company_name = companyName});
return 1;
}));
}
}
func addPersonsAsEmployees(driver neo4j.Driver, companyName string) (int, error) {
session := driver.NewSession(neo4j.SessionConfig{AccessMode: neo4j.AccessModeWrite})
defer session.Close()
persons, err := neo4j.Collect(session.Run("MATCH (a:Person) RETURN a.name AS name", nil))
if err != nil {
return 0, err
}
employees := 0
for _, person := range persons {
_, err = session.WriteTransaction(func(tx neo4j.Transaction) (interface{}, error) {
return tx.Run("MATCH (emp:Person {name: $person_name}) "+
"MERGE (com:Company {name: $company_name}) "+
"MERGE (emp)-[:WORKS_FOR]->(com)", map[string]interface{}{"person_name": person.Values[0], "company_name": companyName})
})
if err != nil {
return 0, err
}
employees++
}
return employees, nil
}
public int addEmployees( final String companyName )
{
try ( Session session = driver.session() )
{
int employees = 0;
List<Record> persons = session.readTransaction( new TransactionWork<List<Record>>()
{
@Override
public List<Record> execute( Transaction tx )
{
return matchPersonNodes( tx );
}
} );
for ( final Record person : persons )
{
employees += session.writeTransaction( new TransactionWork<Integer>()
{
@Override
public Integer execute( Transaction tx )
{
tx.run( "MATCH (emp:Person {name: $person_name}) " +
"MERGE (com:Company {name: $company_name}) " +
"MERGE (emp)-[:WORKS_FOR]->(com)",
parameters( "person_name", person.get( "name" ).asString(), "company_name",
companyName ) );
return 1;
}
} );
}
return employees;
}
}
private static List<Record> matchPersonNodes( Transaction tx )
{
return tx.run( "MATCH (a:Person) RETURN a.name AS name" ).list();
}
def add_employee_to_company(tx, person, company_name):
tx.run("MATCH (emp:Person {name: $person_name}) "
"MERGE (com:Company {name: $company_name}) "
"MERGE (emp)-[:WORKS_FOR]->(com)",
person_name=person["name"], company_name=company_name)
return 1
def match_person_nodes(tx):
return list(tx.run("MATCH (a:Person) RETURN a.name AS name"))
def add_employees(company_name):
employees = 0
with driver.session() as session:
persons = session.read_transaction(match_person_nodes)
for person in persons:
employees += session.write_transaction(add_employee_to_company, person, company_name)
return employees
异步会话
适用于
|
异步会话提供了一个 API,其中函数调用通常返回可用的对象,例如期货。这允许客户端应用程序在异步框架内工作并利用协作式多任务处理。
生命周期
会话生命周期从会话构建开始。然后,会话一直存在,直到它关闭,通常设置为在包含的查询结果使用后发生。
会话可以通过多种不同的方式进行配置。这是通过在会话构造函数中提供配置来实现的。有关更多详细信息,请参阅会话配置。
事务函数
事务函数是**包含事务工作单元的推荐形式**。这种形式的事务需要最少的样板代码,并允许数据库查询和应用程序逻辑之间进行清晰的分离。事务函数也很理想,因为它们封装了重试逻辑,并在将服务器的单个实例替换为集群时提供了最大的灵活性。
函数可以作为读操作或写操作调用。此选择会将事务路由到集群环境中的适当服务器。如果您在单实例环境中,则此路由没有影响,但如果您以后选择采用集群环境,它确实为您提供了灵活性。
在编写事务函数之前,务必确保事务函数执行的任何副作用都设计为幂等的。这是因为如果初始运行失败,该函数可能会执行多次。
在事务函数中获得的任何查询结果都应在该函数内使用,因为当超出范围时,无法正确管理与连接相关的资源。为此,事务函数可以返回值,但这些值应该是派生值而不是原始结果。
当事务失败时,将调用驱动程序重试逻辑。对于一些故障情况,可以立即对不同的服务器重试事务。 这些情况包括连接问题、服务器角色更改(例如领导者选举)和瞬态错误。在创建会话时可以配置重试逻辑。 |
public async Task<List<string>> PrintAllProducts()
{
List<string> result = null;
var session = Driver.AsyncSession();
try
{
// Wrap whole operation into an managed transaction and
// get the results back.
result = await session.ReadTransactionAsync(async tx =>
{
var products = new List<string>();
// Send cypher query to the database
var reader = await tx.RunAsync(
"MATCH (p:Product) WHERE p.id = $id RETURN p.title", // Cypher query
new {id = 0} // Parameters in the query, if any
);
// Loop through the records asynchronously
while (await reader.FetchAsync())
{
// Each current read in buffer can be reached via Current
products.Add(reader.Current[0].ToString());
}
return products;
});
}
finally
{
// asynchronously close session
await session.CloseAsync();
}
return result;
}
public CompletionStage<ResultSummary> printAllProducts()
{
String query = "MATCH (p:Product) WHERE p.id = $id RETURN p.title";
Map<String,Object> parameters = Collections.singletonMap( "id", 0 );
AsyncSession session = driver.asyncSession();
return session.readTransactionAsync( tx ->
tx.runAsync( query, parameters )
.thenCompose( cursor -> cursor.forEachAsync( record ->
// asynchronously print every record
System.out.println( record.get( 0 ).asString() ) ) )
);
}
const session = driver.session()
const titles = []
try {
const result = await session.readTransaction(tx =>
tx.run('MATCH (p:Product) WHERE p.id = $id RETURN p.title', { id: 0 })
)
const records = result.records
for (let i = 0; i < records.length; i++) {
const title = records[i].get(0)
titles.push(title)
}
} finally {
await session.close()
}
自动提交事务
自动提交事务是一种基本但有限的事务形式。此类事务仅包含一个 Cypher 查询,并且在发生错误时不会自动重放。因此,任何错误场景都必须由客户端应用程序本身处理。
自动提交事务旨在用于简单的用例,例如学习 Cypher 或编写一次性脚本时。
不建议在生产环境中使用自动提交事务。 |
与其他类型的 Cypher 查询不同, 因此,从驱动程序执行它们唯一的方法是使用自动提交事务。 |
public async Task<List<string>> ReadProductTitles()
{
var records = new List<string>();
var session = Driver.AsyncSession();
try
{
// Send cypher query to the database.
// The existing IResult interface implements IEnumerable
// and does not play well with asynchronous use cases. The replacement
// IResultCursor interface is returned from the RunAsync
// family of methods instead and provides async capable methods.
var reader = await session.RunAsync(
"MATCH (p:Product) WHERE p.id = $id RETURN p.title", // Cypher query
new {id = 0} // Parameters in the query, if any
);
// Loop through the records asynchronously
while (await reader.FetchAsync())
{
// Each current read in buffer can be reached via Current
records.Add(reader.Current[0].ToString());
}
}
finally
{
// asynchronously close session
await session.CloseAsync();
}
return records;
}
public CompletionStage<List<String>> readProductTitles()
{
String query = "MATCH (p:Product) WHERE p.id = $id RETURN p.title";
Map<String,Object> parameters = Collections.singletonMap( "id", 0 );
AsyncSession session = driver.asyncSession();
return session.runAsync( query, parameters )
.thenCompose( cursor -> cursor.listAsync( record -> record.get( 0 ).asString() ) )
.exceptionally( error ->
{
// query execution failed, print error and fallback to empty list of titles
error.printStackTrace();
return Collections.emptyList();
} )
.thenCompose( titles -> session.closeAsync().thenApply( ignore -> titles ) );
}
async function readProductTitles () {
const session = driver.session()
try {
const result = await session.run(
'MATCH (p:Product) WHERE p.id = $id RETURN p.title',
{
id: 0
}
)
const records = result.records
const titles = []
for (let i = 0; i < records.length; i++) {
const title = records[i].get(0)
titles.push(title)
}
return titles
} finally {
await session.close()
}
}
使用结果
异步会话 API 提供了语言惯用的方法来帮助与异步应用程序和框架集成。
public async Task<List<string>> GetPeopleAsync()
{
var session = Driver.AsyncSession();
try
{
return await session.ReadTransactionAsync(async tx =>
{
var result = await tx.RunAsync("MATCH (a:Person) RETURN a.name ORDER BY a.name");
return await result.ToListAsync(r => r[0].As<string>());
});
}
finally
{
await session.CloseAsync();
}
}
public CompletionStage<List<String>> getPeople()
{
String query = "MATCH (a:Person) RETURN a.name ORDER BY a.name";
AsyncSession session = driver.asyncSession();
return session.readTransactionAsync( tx ->
tx.runAsync( query )
.thenCompose( cursor -> cursor.listAsync( record ->
record.get( 0 ).asString() ) )
);
}
const session = driver.session()
const result = session.run('MATCH (a:Person) RETURN a.name ORDER BY a.name')
const collectedNames = []
result.subscribe({
onNext: record => {
const name = record.get(0)
collectedNames.push(name)
},
onCompleted: () => {
session.close().then(() => {
console.log('Names: ' + collectedNames.join(', '))
})
},
onError: error => {
console.log(error)
}
})
反应式会话
适用于
|
从**Neo4j 4.0**开始,支持查询的反应式处理。这可以通过反应式会话来实现。反应式会话允许动态管理驱动程序和服务器之间交换的数据。
反应式编程的典型特征是,使用者控制从查询中使用记录的速率,而驱动程序则反过来管理从服务器请求记录的速率。在整个 Neo4j 堆栈中都支持流控制,这意味着查询引擎会正确响应流控制信号。这导致更有效的资源处理,并确保接收方不会被迫缓冲任意数量的数据。
有关反应式流的更多信息,请参阅以下内容
反应式会话通常用于已经面向反应式风格的客户端应用程序;预计会使用反应式依赖项或框架。 有关推荐依赖项的更多信息,请参阅入门。 |
事务函数
这种形式的事务需要最少的样板代码,并允许将数据库查询和应用程序逻辑清晰地分离。事务函数也很受欢迎,因为它们封装了重试逻辑,并在将服务器的单个实例替换为集群时提供了最大程度的灵活性。
函数可以作为读操作或写操作调用。此选择会将事务路由到集群环境中的适当服务器。如果您在单实例环境中,则此路由没有影响,但如果您以后选择采用集群环境,它确实为您提供了灵活性。
在编写事务函数之前,务必确保事务函数执行的任何副作用都设计为幂等的。这是因为如果初始运行失败,该函数可能会执行多次。
在事务函数中获得的任何查询结果都应在该函数内使用,因为当超出范围时,无法正确管理与连接相关的资源。为此,事务函数可以返回值,但这些值应该是派生值而不是原始结果。
当事务失败时,将调用驱动程序重试逻辑。对于一些故障案例,可以立即对不同的服务器重试事务。这些情况包括连接问题、服务器角色更改(例如领导者选举)和瞬态错误。在创建会话时可以配置重试逻辑。 |
public IObservable<string> PrintAllProducts()
{
var session = Driver.RxSession();
return session.ReadTransaction(tx =>
{
return tx.Run(
"MATCH (p:Product) WHERE p.id = $id RETURN p.title", // Cypher query
new {id = 0} // Parameters in the query, if any
)
.Records()
.Select(record => record[0].ToString());
}).OnErrorResumeNext(session.Close<string>());
}
public Flux<ResultSummary> printAllProducts()
{
String query = "MATCH (p:Product) WHERE p.id = $id RETURN p.title";
Map<String,Object> parameters = Collections.singletonMap( "id", 0 );
return Flux.usingWhen( Mono.fromSupplier( driver::rxSession ),
session -> session.readTransaction( tx -> {
RxResult result = tx.run( query, parameters );
return Flux.from( result.records() )
.doOnNext( record -> System.out.println( record.get( 0 ).asString() ) ).then( Mono.from( result.consume() ) );
}
), RxSession::close );
}
const session = driver.rxSession()
const result = session.readTransaction(tx =>
tx
.run('MATCH (p:Product) WHERE p.id = $id RETURN p.title', { id: 0 })
.records()
.pipe(
map(r => r.get(0)),
materialize(),
toArray()
)
)
会话可以通过多种不同的方式进行配置。这是通过在会话构造函数中提供配置来实现的。有关更多详细信息,请参阅会话配置。
自动提交事务
自动提交事务是一种基本但有限的事务形式。此类事务仅包含一个 Cypher 查询,并且在发生错误时不会自动重放。因此,任何错误场景都需要由客户端应用程序本身处理。
自动提交事务旨在用于简单的用例,例如学习**Cypher**或编写一次性脚本时。
不建议在生产环境中使用自动提交事务。 |
执行 与其他类型的 Cypher 查询不同, |
public IObservable<string> ReadProductTitles()
{
var session = Driver.RxSession();
return session.Run(
"MATCH (p:Product) WHERE p.id = $id RETURN p.title", // Cypher query
new {id = 0} // Parameters in the query, if any
)
.Records()
.Select(record => record[0].ToString())
.OnErrorResumeNext(session.Close<string>());
}
public Flux<String> readProductTitles()
{
String query = "MATCH (p:Product) WHERE p.id = $id RETURN p.title";
Map<String,Object> parameters = Collections.singletonMap( "id", 0 );
return Flux.usingWhen( Mono.fromSupplier( driver::rxSession ),
session -> Flux.from( session.run( query, parameters ).records() ).map( record -> record.get( 0 ).asString() ),
RxSession::close );
}
function readProductTitles () {
const session = driver.rxSession()
return session
.run('MATCH (p:Product) WHERE p.id = $id RETURN p.title', {
id: 0
})
.records()
.pipe(
map(r => r.get(0)),
materialize(),
toArray()
)
}
使用结果
要在反应式会话中使用查询中的数据,需要一个订阅者来处理发布者返回的结果。
每个事务对应一个数据流,该数据流提供来自服务器的数据。当从该流中提取记录时,结果处理开始。只有一个订阅者可以从给定的流中提取数据。
public IObservable<string> GetPeople()
{
var session = Driver.RxSession();
return session.ReadTransaction(tx =>
{
return tx.Run("MATCH (a:Person) RETURN a.name ORDER BY a.name")
.Records()
.Select(record => record[0].As<string>());
}).OnErrorResumeNext(session.Close<string>());
}
public Flux<String> getPeople()
{
String query = "MATCH (a:Person) RETURN a.name ORDER BY a.name";
return Flux.usingWhen( Mono.fromSupplier( driver::rxSession ),
session -> session.readTransaction( tx -> {
RxResult result = tx.run( query );
return Flux.from( result.records() )
.map( record -> record.get( 0 ).asString() );
}
), RxSession::close );
}
const session = driver.rxSession()
const result = session
.run('MATCH (a:Person) RETURN a.name ORDER BY a.name')
.records()
.pipe(
map(r => r.get(0)),
materialize(),
toArray()
)
会话配置
书签
-
确保会话内事务之间因果一致性的机制。书签在单个会话内的事务之间隐式传递,以满足因果一致性要求。在某些情况下,您可能希望在一个会话中使用书签在另一个新会话中使用。
**默认值:**无(会话最初将创建时不带书签)
- 默认访问模式
-
当不使用事务函数时,访问模式设置的回退。通常,访问模式通过调用相应的事务函数方法在每个事务中设置。在其他情况下,此设置将被继承。请注意,事务函数将忽略/覆盖此设置。
**默认值:**写入
- 数据库
-
会话将与之交互的数据库。当您使用非默认数据库(即
system
数据库或 Neo4j 4.0 企业版中的其他数据库)时,您可以显式配置驱动程序正在执行事务的数据库。有关数据库的更多信息,请参阅操作手册→默认数据库。**默认值:**服务器上配置的默认数据库。
获取大小
-
从服务器的每个批次中获取的记录数。Neo4j 4.0 引入了按批次提取记录的功能,允许客户端应用程序控制数据填充并向服务器应用反压。此
FetchSize
适用于简单会话和异步会话,而反应式会话可以使用订阅的请求方法直接控制。**默认值:**1000 条记录