运行您自己的事务

当使用 ExecuteQuery() 查询数据库时,驱动程序会自动创建一个事务。事务是一个工作单元,它要么整体提交,要么在失败时回滚。您可以在单个查询中包含多个 Cypher 语句,例如在使用 MATCHCREATE 顺序执行来更新数据库时,但您不能有多个查询并在它们之间插入客户端逻辑。

对于这些更高级的用例,驱动程序提供了函数来完全控制事务生命周期。这些被称为托管事务,您可以将它们视为解开 executableQuery() 流并能够在更多地方指定其所需行为的方式。

创建会话

在运行事务之前,您需要获取一个会话。会话充当驱动程序和服务器之间具体的查询通道,并确保强制执行因果一致性

会话通过 DriverWithContext.NewSession() 方法创建。使用第二个参数修改会话的配置,例如目标数据库。有关更多配置参数,请参阅会话配置

session := driver.NewSession(ctx, neo4j.SessionConfig{DatabaseName: "neo4j"})
defer session.Close(ctx)

会话创建是一个轻量级操作,因此可以以较低的成本创建和销毁会话。使用完毕后,请务必关闭会话

会话是线程安全的:您可以在线程之间共享主 DriverWithContext 对象,但请确保每个例程都创建自己的会话。

运行托管事务

一个事务可以包含任意数量的查询。由于 Neo4j 符合 ACID 规范,事务中的查询要么整体执行,要么根本不执行:您不能让事务的一部分成功而另一部分失败。使用事务将相关查询组合在一起,以实现单个逻辑数据库操作。

托管事务通过 SessionWithContext.ExecuteRead()SessionWithContext.ExecuteWrite() 方法创建,具体取决于您是想从数据库中检索数据还是修改数据。这两种方法都接受一个事务函数回调,该回调负责实际执行查询和处理结果。

检索名字以 Al 开头的人
session := driver.NewSession(ctx, neo4j.SessionConfig{DatabaseName: "neo4j"})  (1)
defer session.Close(ctx)
people, err := session.ExecuteRead(ctx,  (2)
    func(tx neo4j.ManagedTransaction) (any, error) {  (3)
        result, err := tx.Run(ctx, `  (4)
            MATCH (p:Person) WHERE p.name STARTS WITH $filter
            RETURN p.name AS name ORDER BY name
            `, map[string]any{
                "filter": "Al",
            })
        if err != nil {
            return nil, err
        }
        records, err := result.Collect(ctx)  (5)
        if err != nil {
            return nil, err
        }
        return records, nil
    })
for _, person := range people.([]*neo4j.Record) {
    fmt.Println(person.AsMap())
}
1 创建一个会话。单个会话可以包含多个查询。完成后请记住关闭它(这里我们打开后立即 defer 关闭)。
2 .ExecuteRead()(或 .ExecuteWrite())方法是事务的入口点。
3 事务函数回调负责运行查询。
4 使用方法 ManagedTransaction.Run() 运行查询。每次查询运行都会返回一个 ResultWithContext 对象。
5 使用 ResultWithContext 上的任何方法处理结果.Collect() 方法将所有记录检索到一个列表中。

不要将参数直接硬编码或连接到查询中。出于性能和安全考虑,请使用查询参数

事务函数不应直接返回结果对象。相反,请务必以某种方式处理结果。在事务函数中,如果 errornil,则 return 语句会导致事务被提交;如果返回的 error 值不为 nil,则事务会自动回滚。

.ExecuteRead().ExecuteWrite() 方法已取代 .ReadTransaction().WriteTransaction(),后者在 5.x 版本中已被弃用,并将在 6.0 版本中移除。
包含多个查询、客户端逻辑和潜在回滚的事务
package main

import (
    "fmt"
    "context"
    "strconv"
    "errors"
    "github.com/neo4j/neo4j-go-driver/v5/neo4j"
)

func main() {
    ctx := context.Background()
    var employeeThreshold int64 = 10  // Neo4j's integer maps to Go's int64

    // Connection to database
    dbUri := "{neo4j-database-uri}"
    dbUser := "{neo4j-username}"
    dbPassword := "{neo4j-password}"
    driver, err := neo4j.NewDriverWithContext(
        dbUri,
        neo4j.BasicAuth(dbUser, dbPassword, ""))
    if err != nil {
        panic(err)
    }
    defer driver.Close(ctx)
    err = driver.VerifyConnectivity(ctx)
    if err != nil {
        panic(err)
    }

    session := driver.NewSession(ctx, neo4j.SessionConfig{DatabaseName: "neo4j"})
    defer session.Close(ctx)

    // Create 100 people and assign them to various organizations
    for i := 0; i < 100; i++ {
        name := "Thor" + strconv.Itoa(i)
        orgId, err := session.ExecuteWrite(ctx,
            func(tx neo4j.ManagedTransaction) (any, error) {
                var orgId string

                // Create new Person node with given name, if not exists already
                _, err := tx.Run(
                    ctx,
                    "MERGE (p:Person {name: $name})",
                    map[string]any{
                        "name": name,
                    })
                if err != nil {
                    return nil, err
                }

                // Obtain most recent organization ID and the number of people linked to it
                result, err := tx.Run(
                    ctx, `
                    MATCH (o:Organization)
                    RETURN o.id AS id, COUNT{(p:Person)-[r:WORKS_FOR]->(o)} AS employeesN
                    ORDER BY o.createdDate DESC
                    LIMIT 1
                    `, nil)
                if err != nil {
                    return nil, err
                }
                org, err := result.Single(ctx)

                // If no organization exists, create one and add Person to it
                if org == nil {
                    orgId, _ = createOrganization(ctx, tx)
                    fmt.Println("No orgs available, created", orgId)
                    err = addPersonToOrganization(ctx, tx, name, orgId)
                    if err != nil {
                        return nil, errors.New("Failed to add person to new org")
                        // Transaction will roll back
                        // -> not even Person and/or Organization is created!
                    }
                } else {
                    orgId = org.AsMap()["id"].(string)
                    if employeesN := org.AsMap()["employeesN"].(int64);
                       employeesN == 0 {
                        return nil, errors.New("Most recent organization is empty")
                        // Transaction will roll back
                        // -> not even Person is created!
                    }

                    // If org does not have too many employees, add this Person to it
                    if employeesN := org.AsMap()["employeesN"].(int64);
                       employeesN < employeeThreshold {
                        err = addPersonToOrganization(ctx, tx, name, orgId)
                        if err != nil {
                            return nil, err
                            // Transaction will roll back
                            // -> not even Person is created!
                        }
                    // Otherwise, create a new Organization and link Person to it
                    } else {
                        orgId, err = createOrganization(ctx, tx)
                        if err != nil {
                            return nil, err
                            // Transaction will roll back
                            // -> not even Person is created!
                        }
                        fmt.Println("Latest org is full, created", orgId)
                        err = addPersonToOrganization(ctx, tx, name, orgId)
                        if err != nil {
                            return nil, err
                            // Transaction will roll back
                            // -> not even Person and/or Organization is created!
                        }
                    }
                }
                // Return the Organization ID to which the new Person ends up in
                return orgId, nil
            })
        if err != nil {
            fmt.Println(err)
        } else {
            fmt.Println("User", name, "added to organization", orgId)
        }
    }
}

func createOrganization(ctx context.Context, tx neo4j.ManagedTransaction) (string, error) {
    result, err := tx.Run(
        ctx, `
        CREATE (o:Organization {id: randomuuid(), createdDate: datetime()})
        RETURN o.id AS id
        `, nil)
    if err != nil {
        return "", err
    }
    org, err := result.Single(ctx)
    if err != nil {
        return "", err
    }
    orgId, _ := org.AsMap()["id"]
    return orgId.(string), err
}

func addPersonToOrganization(ctx context.Context, tx neo4j.ManagedTransaction, personName string, orgId string) (error) {
    _, err := tx.Run(
        ctx, `
        MATCH (o:Organization {id: $orgId})
        MATCH (p:Person {name: $name})
        MERGE (p)-[:WORKS_FOR]->(o)
        `, map[string]any{
            "orgId": orgId,
            "name": personName,
        })
    return err
}

如果事务因驱动程序认为是瞬时原因而失败,它会自动重试运行事务函数(延迟时间呈指数级增长)。因此,事务函数必须是幂等的(即,多次运行时应产生相同的效果),因为您无法预先知道它们将执行多少次。实际上,这意味着您不应修改或依赖全局变量等。请注意,虽然事务函数可能会被执行多次,但其中的查询将始终只运行一次。

一个会话可以链式连接多个事务,但在任何给定时间,一个会话中只能有一个事务处于活动状态。要维护多个并发事务,请使用多个并发会话。

运行显式事务

您可以通过使用方法 SessionWithContext.BeginTransaction() 手动开始事务来完全控制事务。您可以使用方法 ExplicitTransaction.Run() 在显式事务中运行查询。

session := driver.NewSession(ctx, neo4j.SessionConfig{DatabaseName: "neo4j"})
defer session.Close(ctx)
tx, err := session.BeginTransaction(ctx)
if err != nil {
    panic(err)
}
    // use tx.Run() to run queries
    //     tx.Commit() to commit the transaction
    //     tx.Rollback() to rollback the transaction

显式事务可以通过 ExplicitTransaction.Commit() 提交,或通过 ExplicitTransaction.Rollback() 回滚。如果没有采取显式操作,驱动程序会在事务生命周期结束时自动回滚事务。

显式事务最适用于需要在同一事务中跨多个函数分发 Cypher 执行的应用程序,或需要在单个事务中运行多个查询但不需要托管事务提供的自动重试功能的应用程序。

显式事务与外部 API 交互的示意图
package main

import (
    "fmt"
    "context"
    "github.com/neo4j/neo4j-go-driver/v5/neo4j"
)

func main() {
    ctx := context.Background()

    // Connection to database
    dbUri := "{neo4j-database-uri}"
    dbUser := "{neo4j-username}"
    dbPassword := "{neo4j-password}"
    driver, err := neo4j.NewDriverWithContext(
        dbUri,
        neo4j.BasicAuth(dbUser, dbPassword, ""))
    if err != nil {
        panic(err)
    }
    defer driver.Close(ctx)
    err = driver.VerifyConnectivity(ctx)
    if err != nil {
        panic(err)
    }
    customerId, err := createCustomer(ctx, driver)
    if err != nil {
        panic(err)
    }
    otherBankId := 42
    transferToOtherBank(ctx, driver, customerId, otherBankId, 999)
}

func createCustomer(ctx context.Context, driver neo4j.DriverWithContext) (string, error) {
    result, err := neo4j.ExecuteQuery(ctx, driver, `
    MERGE (c:Customer {id: randomUUID()})
    RETURN c.id AS id
    `, nil,
    neo4j.EagerResultTransformer,
    neo4j.ExecuteQueryWithDatabase("neo4j"))
    if err != nil {
        return "", err
    }
    customerId, _ := result.Records[0].Get("id")
    return customerId.(string), err
}

func transferToOtherBank(ctx context.Context, driver neo4j.DriverWithContext, customerId string, otherBankId int, amount float32) {
    session := driver.NewSession(ctx, neo4j.SessionConfig{DatabaseName: "neo4j"})
    defer session.Close(ctx)
    tx, err := session.BeginTransaction(ctx)
    if err != nil {
        panic(err)
    }

    if ! customerBalanceCheck(ctx, tx, customerId, amount) {
        // give up
        return
    }

    otherBankTransferApi(ctx, customerId, otherBankId, amount)
    // Now the money has been transferred => can't rollback anymore
    // (cannot rollback external services interactions)

    err = decreaseCustomerBalance(ctx, tx, customerId, amount)
    if err != nil {
        requestInspection(ctx, customerId, otherBankId, amount, err)
    }
    err = tx.Commit(ctx)
    if err != nil {
        requestInspection(ctx, customerId, otherBankId, amount, err)
    }
}

func customerBalanceCheck(ctx context.Context, tx neo4j.ExplicitTransaction, customerId string, amount float32) (bool) {
    result, err := tx.Run(ctx, `
        MATCH (c:Customer {id: $id})
        RETURN c.balance >= $amount AS sufficient
        `, map[string]any{
            "id": customerId,
            "amount": amount,
        })
    if err == nil {
        return false
    }
    record, err := result.Single(ctx)
    if err == nil {
        return false
    }
    sufficient := record.AsMap()["sufficient"]
    return sufficient.(bool)
}

func otherBankTransferApi(ctx context.Context, customerId string, otherBankId int, amount float32) {
    // make some API call to other bank
}

func decreaseCustomerBalance(ctx context.Context, tx neo4j.ExplicitTransaction, customerId string, amount float32) (error) {
    _, err := tx.Run(ctx, `
        MATCH (c:Customer {id: $id})
        SET c.balance = c.balance - $amount
        `, map[string]any{
            "id": customerId,
            "amount": amount,
        })
    return err
}

func requestInspection(ctx context.Context, customerId string, otherBankId int, amount float32, err error) {
    // manual cleanup required; log this or similar
    fmt.Println("WARNING: transaction rolled back due to exception:", err)
    fmt.Println("customerId:", customerId, "otherBankId:", otherBankId, "amount:", amount)
}

处理查询结果

驱动程序的查询输出是一个 ResultWithContext 对象,它不直接包含结果记录。相反,它将 Cypher 结果封装在一个丰富的数据结构中,需要在客户端进行一些解析。需要注意的两个主要点是:

  • 结果记录并非立即由服务器完全获取并返回。相反,结果以惰性流的形式出现。特别是,当驱动程序从服务器接收到一些记录时,它们最初会缓冲在后台队列中。记录会保留在缓冲区中,直到被应用程序消费,此时它们会从缓冲区中移除。当没有更多记录可用时,结果就会耗尽

  • 结果充当一个游标。这意味着无法从流中检索上一条记录,除非您已将其保存在辅助数据结构中。

下面的动画展示了单个查询的路径:它显示了驱动程序如何处理结果记录以及应用程序应如何处理结果。

处理结果最简单的方法是调用其上的 .Collect(ctx),它会生成一个 Record 对象数组。此外,ResultWithContext 对象还实现了许多用于处理记录的方法。下面列出了最常用的方法。

名称 描述

Collect(ctx) ([]*Record, error)

将结果的其余部分作为列表返回。

Single(ctx) (*Record, error)

返回下一个且是唯一剩余的记录,如果不存在则返回 nil。调用此方法总是会耗尽结果。

如果有多个(或少于一个)记录可用,则返回非 nil 错误。

Record() *Record

返回当前记录。

Next(ctx) bool

如果当前记录之后有待处理的记录,则返回 true。在这种情况下,它还会推进结果迭代器。

Consume(ctx) (ResultSummary, error)

返回查询结果摘要。它会耗尽结果,因此只应在数据处理完成后调用。

有关 ResultWithContext 方法的完整列表,请参阅 API 文档 → ResultWithContext

会话配置

数据库选择

建议在创建会话时始终使用配置参数 DatabaseName 明确指定数据库,即使是在单数据库实例上也是如此。这使得驱动程序能够更高效地工作,因为它节省了到服务器的网络往返来解析主数据库。如果没有指定数据库,则使用 Neo4j 实例设置中配置的默认数据库

session := driver.NewSession(ctx, neo4j.SessionConfig{
    DatabaseName: "neo4j",
})
建议通过配置方法指定数据库,而不是使用 USE Cypher 子句。如果服务器在集群上运行,带有 USE 的查询需要启用服务器端路由。查询执行时间也可能更长,因为它们可能无法在第一次尝试时到达正确的集群成员,并且需要被路由到包含所请求数据库的成员。

请求路由

在集群环境中,所有会话都以写入模式打开,并将其路由到主节点。您可以通过将配置参数 AccessMode 明确设置为 neo4j.AccessModeReadneo4j.AccessModeWrite 来更改此设置。请注意,.ExecuteRead().ExecuteWrite() 会自动覆盖会话的默认访问模式。

session := driver.NewSession(ctx, neo4j.SessionConfig{
    DatabaseName: "neo4j",
    AccessMode: neo4j.AccessModeRead,
})

尽管在读取模式下执行写入查询很可能会导致运行时错误,但您不应依赖此进行访问控制。 这两种模式的区别在于,读取事务将被路由到集群的任何节点,而写入事务则被定向到主节点。换句话说,不能保证以读取模式提交的写入查询会被拒绝。

类似的说明也适用于 .ExecuteRead().ExecuteWrite() 方法。

以不同用户身份运行查询

您可以使用配置选项 Auth 以不同用户身份执行查询。在会话级别切换用户比创建新的 DriverWithContext 对象成本更低。然后,查询将在给定用户的安全上下文(即主数据库、权限等)中运行。
会话范围的身份验证要求服务器版本 >= 5.8。

sessionAuth := neo4j.BasicAuth("somebodyElse", "theirPassword", "")
session := driver.NewSession(ctx, neo4j.SessionConfig{
    DatabaseName: "neo4j",
    Auth: &sessionAuth,
})

选项 ImpersonatedUser 提供了类似的功能,并且在驱动程序/服务器版本 >= 4.4 中可用。区别在于您不需要知道用户的密码即可模拟他们,但创建 Driver 的用户需要具有相应的权限

session := driver.NewSession(ctx, neo4j.SessionConfig{
    DatabaseName: "neo4j",
    ImpersonatedUser: "<somebodyElse>",
})

事务配置

您可以通过向 .ExecuteRead().ExecuteWrite().BeginTransaction() 提供配置回调来进一步控制事务。使用它们来指定

  • 事务超时(秒)。运行时间过长的事务将被服务器终止。默认值在服务器端设置。最小值为一毫秒。

  • 附加到事务的元数据映射。这些元数据会记录在服务器的 query.log 中,并在 SHOW TRANSACTIONS Cypher 命令的输出中可见。使用此功能标记事务。

session := driver.NewSession(ctx, neo4j.SessionConfig{DatabaseName: "neo4j"})
defer session.Close(ctx)
people, err := session.ExecuteRead(ctx,
    func(tx neo4j.ManagedTransaction) (any, error) {
        result, _ := tx.Run(ctx, "MATCH (:Person) RETURN count(*) AS n", nil)
        return result.Collect(ctx)
    },
    neo4j.WithTxTimeout(5*time.Second),  // remember to import `time`
    neo4j.WithTxMetadata(map[string]any{"appName": "peopleTracker"}))

关闭会话

每个连接池都有有限数量的会话,因此如果您打开会话而不关闭它们,您的应用程序可能会耗尽会话。因此,建议您在创建新会话后立即使用 defer 关键字调用 session.Close(),以确保在所有情况下都能关闭会话。当会话关闭时,它会返回到连接池以供以后重用。

session := driver.NewSession(ctx, neo4j.SessionConfig{DatabaseName: "neo4j"})
defer session.Close(ctx)
// session usage

在某些极端情况下,会话关闭可能会返回错误,因此您可能也需要捕获这些情况。

词汇表

LTS

长期支持版本是指保证支持多年的版本。Neo4j 4.4 是 LTS 版本,Neo4j 5 也将拥有一个 LTS 版本。

Aura

Aura 是 Neo4j 的全托管云服务。它提供免费和付费计划。

Cypher

Cypher 是 Neo4j 的图查询语言,允许您从数据库中检索数据。它类似于 SQL,但专用于图。

APOC

Awesome Procedures On Cypher (APOC) 是一个函数库,其中包含许多无法在 Cypher 本身中轻松表达的函数。

Bolt

Bolt 是 Neo4j 实例和驱动程序之间交互所使用的协议。它默认监听端口 7687。

ACID

原子性、一致性、隔离性、持久性 (ACID) 是确保数据库事务可靠处理的属性。符合 ACID 的 DBMS 确保数据库中的数据即使在发生故障时也能保持准确和一致。

最终一致性

如果数据库能保证所有集群成员在某个时间点最终会存储最新版本的数据,则该数据库是最终一致的。

因果一致性

如果集群的每个成员都以相同的顺序看到读写查询,则数据库是因果一致的。这比最终一致性更强。

NULL

null 标记不是一种类型,而是表示值缺失的占位符。有关更多信息,请参阅Cypher → 使用 null

事务

事务是一个工作单元,它要么整体提交,要么在失败时回滚。一个例子是银行转账:它涉及多个步骤,但它们必须全部成功或被撤销,以避免钱从一个账户扣除但未添加到另一个账户。

背压

背压是一种抵抗数据流动的力。它确保客户端不会被数据淹没,即数据传输速度不会快于客户端处理速度。

事务函数

事务函数是由 ExecuteReadExecuteWrite 调用执行的回调。在服务器故障的情况下,驱动程序会自动重新执行该回调。

DriverWithContext

DriverWithContext 对象包含与 Neo4j 数据库建立连接所需的详细信息。

© . All rights reserved.