运行您自己的事务
当使用 ExecuteQuery()
查询数据库时,驱动程序会自动创建一个事务。事务是工作单元,要么完全提交,要么在失败时回滚。例如,当使用 MATCH
和 CREATE
按顺序更新数据库时,您可以在单个查询中包含多个 Cypher 语句,但您不能有多个查询并在它们之间插入一些客户端逻辑。
对于这些更高级的用例,驱动程序提供了用于完全控制事务生命周期的函数。这些被称为托管事务,您可以将它们视为解开 executableQuery()
的流程并能够在更多位置指定其所需行为的一种方式。
创建会话
运行托管事务
一个事务可以包含任意数量的查询。由于 Neo4j 符合ACID,因此事务中的查询要么全部执行,要么根本不执行:您无法使事务的一部分成功而另一部分失败。使用事务将相关查询组合在一起,这些查询协同工作以实现单个逻辑数据库操作。
托管事务是使用SessionWithContext.ExecuteRead()
和SessionWithContext.ExecuteWrite()
方法创建的,具体取决于您是想从数据库中检索数据还是更改它。这两种方法都采用事务函数回调,该回调负责实际执行查询和处理结果。
session := driver.NewSession(ctx, neo4j.SessionConfig{DatabaseName: "neo4j"}) (1)
defer session.Close(ctx)
people, err := session.ExecuteRead(ctx, (2)
func(tx neo4j.ManagedTransaction) (any, error) { (3)
result, err := tx.Run(ctx, ` (4)
MATCH (p:Person) WHERE p.name STARTS WITH $filter
RETURN p.name AS name ORDER BY name
`, map[string]any{
"filter": "Al",
})
if err != nil {
return nil, err
}
records, err := result.Collect(ctx) (5)
if err != nil {
return nil, err
}
return records, nil
})
for _, person := range people.([]*neo4j.Record) {
fmt.Println(person.AsMap())
}
1 | 创建一个会话。单个会话可以容纳多个查询。完成后请记住关闭它(在这里我们defer 其打开后的关闭)。 |
2 | .ExecuteRead() (或.ExecuteWrite() )方法是事务的入口点。 |
3 | 事务函数回调负责运行查询。 |
4 | 使用ManagedTransaction.Run() 方法运行查询。每个运行的查询都返回一个ResultWithContext 对象。 |
5 | 处理结果,使用 ResultWithContext 上的任何方法。.Collect() 方法将所有记录检索到列表中。 |
不要将参数硬编码或直接连接到查询中。出于性能和安全原因,请改用查询参数。
事务函数永远不应该直接返回结果对象。相反,始终以某种方式处理结果。在事务函数中,如果error
为nil
,则return
语句会导致事务提交,而如果返回的error
值不为nil
,则事务会自动回滚。
.ExecuteRead() 和.ExecuteWrite() 方法已取代.ReadTransaction() 和.WriteTransaction() 方法,后者在 5.x 版本中已弃用,将在 6.0 版本中删除。 |
package main
import (
"fmt"
"context"
"strconv"
"errors"
"github.com/neo4j/neo4j-go-driver/v5/neo4j"
)
func main() {
ctx := context.Background()
var employeeThreshold int64 = 10 // Neo4j's integer maps to Go's int64
// Connection to database
dbUri := "<URI for Neo4j database>"
dbUser := "<Username>"
dbPassword := "<Password>"
driver, err := neo4j.NewDriverWithContext(
dbUri,
neo4j.BasicAuth(dbUser, dbPassword, ""))
if err != nil {
panic(err)
}
defer driver.Close(ctx)
err = driver.VerifyConnectivity(ctx)
if err != nil {
panic(err)
}
session := driver.NewSession(ctx, neo4j.SessionConfig{DatabaseName: "neo4j"})
defer session.Close(ctx)
// Create 100 people and assign them to various organizations
for i := 0; i < 100; i++ {
name := "Thor" + strconv.Itoa(i)
orgId, err := session.ExecuteWrite(ctx,
func(tx neo4j.ManagedTransaction) (any, error) {
var orgId string
// Create new Person node with given name, if not exists already
_, err := tx.Run(
ctx,
"MERGE (p:Person {name: $name})",
map[string]any{
"name": name,
})
if err != nil {
return nil, err
}
// Obtain most recent organization ID and the number of people linked to it
result, err := tx.Run(
ctx, `
MATCH (o:Organization)
RETURN o.id AS id, COUNT{(p:Person)-[r:WORKS_FOR]->(o)} AS employeesN
ORDER BY o.createdDate DESC
LIMIT 1
`, nil)
if err != nil {
return nil, err
}
org, err := result.Single(ctx)
// If no organization exists, create one and add Person to it
if org == nil {
orgId, _ = createOrganization(ctx, tx)
fmt.Println("No orgs available, created", orgId)
err = addPersonToOrganization(ctx, tx, name, orgId)
if err != nil {
return nil, errors.New("Failed to add person to new org")
// Transaction will roll back
// -> not even Person and/or Organization is created!
}
} else {
orgId = org.AsMap()["id"].(string)
if employeesN := org.AsMap()["employeesN"].(int64);
employeesN == 0 {
return nil, errors.New("Most recent organization is empty")
// Transaction will roll back
// -> not even Person is created!
}
// If org does not have too many employees, add this Person to it
if employeesN := org.AsMap()["employeesN"].(int64);
employeesN < employeeThreshold {
err = addPersonToOrganization(ctx, tx, name, orgId)
if err != nil {
return nil, err
// Transaction will roll back
// -> not even Person is created!
}
// Otherwise, create a new Organization and link Person to it
} else {
orgId, err = createOrganization(ctx, tx)
if err != nil {
return nil, err
// Transaction will roll back
// -> not even Person is created!
}
fmt.Println("Latest org is full, created", orgId)
err = addPersonToOrganization(ctx, tx, name, orgId)
if err != nil {
return nil, err
// Transaction will roll back
// -> not even Person and/or Organization is created!
}
}
}
// Return the Organization ID to which the new Person ends up in
return orgId, nil
})
if err != nil {
fmt.Println(err)
} else {
fmt.Println("User", name, "added to organization", orgId)
}
}
}
func createOrganization(ctx context.Context, tx neo4j.ManagedTransaction) (string, error) {
result, err := tx.Run(
ctx, `
CREATE (o:Organization {id: randomuuid(), createdDate: datetime()})
RETURN o.id AS id
`, nil)
if err != nil {
return "", err
}
org, err := result.Single(ctx)
if err != nil {
return "", err
}
orgId, _ := org.AsMap()["id"]
return orgId.(string), err
}
func addPersonToOrganization(ctx context.Context, tx neo4j.ManagedTransaction, personName string, orgId string) (error) {
_, err := tx.Run(
ctx, `
MATCH (o:Organization {id: $orgId})
MATCH (p:Person {name: $name})
MERGE (p)-[:WORKS_FOR]->(o)
`, map[string]any{
"orgId": orgId,
"name": personName,
})
return err
}
如果事务由于驱动程序认为是瞬态的原因而失败,它会自动重试运行事务函数(并以指数递增的延迟)。因此,事务函数必须是幂等的(即,多次运行时应产生相同的效果),因为您事先不知道它们将执行多少次。在实践中,这意味着您不应该编辑或依赖全局变量,例如。请注意,尽管事务函数可能会执行多次,但其中的查询始终只会执行一次。
一个会话可以链接多个事务,但在任何给定时间,一个会话中只能有一个事务处于活动状态。要维护多个并发事务,请使用多个并发会话。
运行显式事务
您可以通过使用SessionWithContext.BeginTransaction()
方法手动开始事务来完全控制事务。您使用ExplicitTransaction.Run()
方法在显式事务中运行查询。
session := driver.NewSession(ctx, neo4j.SessionConfig{DatabaseName: "neo4j"})
defer session.Close(ctx)
tx, err := session.BeginTransaction(ctx)
if err != nil {
panic(err)
}
// use tx.Run() to run queries
// tx.Commit() to commit the transaction
// tx.Rollback() to rollback the transaction
显式事务可以使用ExplicitTransaction.Commit()
提交,或者使用ExplicitTransaction.Rollback()
回滚。如果没有采取任何显式操作,驱动程序会在其生命周期结束时自动回滚事务。
显式事务最适用于需要跨多个函数分配 Cypher 执行以进行相同事务的应用程序,或需要在单个事务中运行多个查询但不需要托管事务提供的自动重试的应用程序。
package main
import (
"fmt"
"context"
"github.com/neo4j/neo4j-go-driver/v5/neo4j"
)
func main() {
ctx := context.Background()
// Connection to database
dbUri := "<URI for Neo4j database>"
dbUser := "<Username>"
dbPassword := "<Password>"
driver, err := neo4j.NewDriverWithContext(
dbUri,
neo4j.BasicAuth(dbUser, dbPassword, ""))
if err != nil {
panic(err)
}
defer driver.Close(ctx)
err = driver.VerifyConnectivity(ctx)
if err != nil {
panic(err)
}
customerId, err := createCustomer(ctx, driver)
if err != nil {
panic(err)
}
otherBankId := 42
transferToOtherBank(ctx, driver, customerId, otherBankId, 999)
}
func createCustomer(ctx context.Context, driver neo4j.DriverWithContext) (string, error) {
result, err := neo4j.ExecuteQuery(ctx, driver, `
MERGE (c:Customer {id: randomUUID()})
RETURN c.id AS id
`, nil,
neo4j.EagerResultTransformer,
neo4j.ExecuteQueryWithDatabase("neo4j"))
if err != nil {
return "", err
}
customerId, _ := result.Records[0].Get("id")
return customerId.(string), err
}
func transferToOtherBank(ctx context.Context, driver neo4j.DriverWithContext, customerId string, otherBankId int, amount float32) {
session := driver.NewSession(ctx, neo4j.SessionConfig{DatabaseName: "neo4j"})
defer session.Close(ctx)
tx, err := session.BeginTransaction(ctx)
if err != nil {
panic(err)
}
if ! customerBalanceCheck(ctx, tx, customerId, amount) {
// give up
return
}
otherBankTransferApi(ctx, customerId, otherBankId, amount)
// Now the money has been transferred => can't rollback anymore
// (cannot rollback external services interactions)
err = decreaseCustomerBalance(ctx, tx, customerId, amount)
if err != nil {
requestInspection(ctx, customerId, otherBankId, amount, err)
}
err = tx.Commit(ctx)
if err != nil {
requestInspection(ctx, customerId, otherBankId, amount, err)
}
}
func customerBalanceCheck(ctx context.Context, tx neo4j.ExplicitTransaction, customerId string, amount float32) (bool) {
result, err := tx.Run(ctx, `
MATCH (c:Customer {id: $id})
RETURN c.balance >= $amount AS sufficient
`, map[string]any{
"id": customerId,
"amount": amount,
})
if err == nil {
return false
}
record, err := result.Single(ctx)
if err == nil {
return false
}
sufficient := record.AsMap()["sufficient"]
return sufficient.(bool)
}
func otherBankTransferApi(ctx context.Context, customerId string, otherBankId int, amount float32) {
// make some API call to other bank
}
func decreaseCustomerBalance(ctx context.Context, tx neo4j.ExplicitTransaction, customerId string, amount float32) (error) {
_, err := tx.Run(ctx, `
MATCH (c:Customer {id: $id})
SET c.balance = c.balance - $amount
`, map[string]any{
"id": customerId,
"amount": amount,
})
return err
}
func requestInspection(ctx context.Context, customerId string, otherBankId int, amount float32, err error) {
// manual cleanup required; log this or similar
fmt.Println("WARNING: transaction rolled back due to exception:", err)
fmt.Println("customerId:", customerId, "otherBankId:", otherBankId, "amount:", amount)
}
处理查询结果
驱动程序的查询输出是一个ResultWithContext
对象,它不直接包含结果记录。相反,它将 Cypher 结果封装在一个丰富的数据结构中,需要在客户端进行一些解析。需要注意两点
-
服务器不会立即完全获取并返回结果记录。相反,结果以延迟流的形式返回。具体来说,当驱动程序从服务器接收到一些记录时,它们最初会被缓冲到后台队列中。记录会保留在缓冲区中,直到被应用程序使用,此时它们会被从缓冲区中移除。当没有更多记录可用时,结果即为耗尽。
-
结果充当游标。这意味着无法从流中检索之前的记录,除非您将其保存在辅助数据结构中。
下面的动画演示了一个查询的执行路径:它展示了驱动程序如何处理结果记录以及应用程序应该如何处理结果。
处理结果的最简单方法是在其上调用.Collect(ctx)
,它会生成一个包含Record
对象的数组。否则,ResultWithContext
对象实现了一些用于处理记录的方法。下面列出了最常用的方法。
名称 | 描述 |
---|---|
|
将结果的剩余部分作为列表返回。 |
|
返回下一个且唯一的剩余记录,或 如果有多于(或少于)一条记录可用,则会返回非 |
|
返回当前记录。 |
|
如果在当前记录之后还有记录需要处理,则返回 |
|
返回查询的结果摘要。它会耗尽结果,因此只应在数据处理完成后调用。 |
有关ResultWithContext
方法的完整列表,请参阅API 文档 → ResultWithContext。
会话配置
数据库选择
建议在创建会话时,始终使用配置参数DatabaseName
显式指定数据库,即使在单数据库实例上也是如此。这允许驱动程序更高效地工作,因为它节省了到服务器的网络往返以解析主数据库。如果未指定数据库,则使用Neo4j实例设置中设置的默认数据库。
session := driver.NewSession(ctx, neo4j.SessionConfig{
DatabaseName: "neo4j",
})
通过配置方法指定数据库比使用USE Cypher子句更可取。如果服务器在集群上运行,则使用USE 的查询需要启用服务器端路由。查询的执行时间也可能更长,因为它们可能在第一次尝试时无法到达正确的集群成员,并且需要路由到包含请求数据库的成员。 |
请求路由
在集群环境中,所有会话都以写入模式打开,将它们路由到领导者。您可以通过将配置参数AccessMode
显式设置为neo4j.AccessModeRead
或neo4j.AccessModeWrite
来更改此设置。请注意,.ExecuteRead()
和.ExecuteWrite()
会自动覆盖会话的默认访问模式。
session := driver.NewSession(ctx, neo4j.SessionConfig{
DatabaseName: "neo4j",
AccessMode: neo4j.AccessModeRead,
})
虽然在读取模式下执行写入查询可能会导致运行时错误,但不应依赖此功能进行访问控制。这两种模式之间的区别在于,读取事务将被路由到集群的任何节点,而写入事务将被定向到领导者。换句话说,无法保证在读取模式下提交的写入查询会被拒绝。
|
以其他用户身份运行查询(模拟)
您可以使用配置参数ImpersonatedUser
在其他用户的安全上下文中执行查询,指定要模拟的用户名称。要使此功能生效,创建DriverWithContext
的用户需要具有相应的权限。模拟用户比创建新的DriverWithContext
对象更经济。
session := driver.NewSession(ctx, neo4j.SessionConfig{
DatabaseName: "neo4j",
ImpersonatedUser: "<somebodyElse>",
})
在模拟用户时,查询将在模拟用户的完整安全上下文中运行,而不是已认证的用户(即主数据库、权限等)。
事务配置
您可以通过向.ExecuteRead()
、.ExecuteWrite()
和.BeginTransaction()
提供配置回调来进一步控制事务。使用它们来指定
-
事务超时时间(以秒为单位)。运行时间过长的交易将由服务器终止。默认值在服务器端设置。最小值为 1 毫秒。
-
附加到事务的元数据映射。这些元数据将记录在服务器的
query.log
中,并且在SHOW TRANSACTIONS
Cypher命令的输出中可见。使用它来标记事务。
session := driver.NewSession(ctx, neo4j.SessionConfig{DatabaseName: "neo4j"})
defer session.Close(ctx)
people, err := session.ExecuteRead(ctx,
func(tx neo4j.ManagedTransaction) (any, error) {
result, _ := tx.Run(ctx, "MATCH (:Person) RETURN count(*) AS n", nil)
return result.Collect(ctx)
},
neo4j.WithTxTimeout(5*time.Second), // remember to import `time`
neo4j.WithTxMetadata(map[string]any{"appName": "peopleTracker"}))
关闭会话
每个连接池都有有限数量的会话,因此,如果您打开会话但从未关闭它们,则应用程序可能会用完这些会话。因此,建议在创建新会话后立即使用defer
关键字调用session.Close()
,以确保在所有情况下都会关闭会话。当会话关闭时,它将返回到连接池以供以后重用。
session := driver.NewSession(ctx, neo4j.SessionConfig{DatabaseName: "neo4j"})
defer session.Close(ctx)
// session usage
在某些情况下,会话关闭可能会返回错误,因此您可能也希望捕获这些情况。
术语表
- LTS
-
长期支持版本是保证支持若干年的版本。Neo4j 4.4 是 LTS 版本,Neo4j 5 也将有一个 LTS 版本。
- Aura
-
Aura是Neo4j的完全托管云服务。它提供免费和付费计划。
- Cypher
-
Cypher是Neo4j的图查询语言,允许您从数据库中检索数据。它类似于SQL,但用于图。
- APOC
-
Awesome Procedures On Cypher (APOC)是一个包含(许多)函数的库,这些函数无法用Cypher本身轻松表达。
- Bolt
-
Bolt是用于Neo4j实例和驱动程序之间交互的协议。默认情况下,它监听端口 7687。
- ACID
-
原子性、一致性、隔离性和持久性(ACID)是保证数据库事务可靠处理的特性。符合ACID的DBMS确保数据库中的数据即使在发生故障时也能保持准确和一致。
- 最终一致性
-
如果数据库保证所有集群成员都将在某个时间点存储数据的最新版本,则称该数据库最终一致。
- 因果一致性
-
如果读取和写入查询按相同的顺序被集群的每个成员看到,则称该数据库因果一致。这比最终一致性更强。
- NULL
-
空标记不是类型,而是值不存在的占位符。有关更多信息,请参阅Cypher → 使用
null
。 - 事务
-
事务是一项工作单元,要么完全提交,要么在发生故障时回滚。例如银行转账:它涉及多个步骤,但它们必须全部成功或回退,以避免从一个账户中扣除资金但未添加到另一个账户。
- 背压
-
背压是一种与数据流方向相反的力。它确保客户端不会被数据以快于其处理速度的速度淹没。
- 事务函数
-
事务函数是由
ExecuteRead
或ExecuteWrite
调用执行的回调。如果服务器发生故障,驱动程序会自动重新执行回调。 - DriverWithContext
-
DriverWithContext
对象包含建立与Neo4j数据库连接所需的详细信息。