协调并行事务

在使用 Neo4j 集群时,在大多数情况下,默认情况下会强制执行因果一致性,这保证查询能够读取先前查询所做的更改。但是,对于并行运行的多个事务,默认情况下不会发生这种情况。在这种情况下,您可以使用书签让一个事务等待另一个事务的结果传播到整个集群,然后再运行自己的工作。这不是一项要求,并且**只有在您需要跨不同事务的因果一致性时才应使用书签**,因为等待书签可能会对性能产生负面影响。

书签是一个表示数据库某些状态的令牌。通过将一个或多个书签与查询一起传递,服务器将确保在建立表示的状态之前不会执行查询。

使用 ExecuteQuery() 的书签

使用 ExecuteQuery() 查询数据库时,驱动程序会为您管理书签。在这种情况下,您可以保证后续查询可以在不采取进一步操作的情况下读取先前的更改。

neo4j.ExecuteQuery(ctx, driver, "<QUERY 1>", nil,
    neo4j.EagerResultTransformer,
    neo4j.ExecuteQueryWithDatabase("neo4j"))

// subsequent ExecuteQuery calls will be causally chained

neo4j.ExecuteQuery(ctx, driver, "<QUERY 2>", nil,  // can read result of <QUERY 1>
    neo4j.EagerResultTransformer,
    neo4j.ExecuteQueryWithDatabase("neo4j"))
neo4j.ExecuteQuery(ctx, driver, "<QUERY 3>", nil,  // can read result of <QUERY 2>
    neo4j.EagerResultTransformer,
    neo4j.ExecuteQueryWithDatabase("neo4j"))

要禁用书签管理和因果一致性,请在 ExecuteQuery() 调用中使用配置回调 neo4j.ExecuteQueryWithoutBookmarkManager()

neo4j.ExecuteQuery(
    ctx, driver, "<QUERY>", nil, neo4j.EagerResultTransformer,
    neo4j.ExecuteQueryWithDatabase("neo4j"),
    neo4j.ExecuteQueryWithoutBookmarkManager())

单个会话中的书签

对于在单个会话中运行的查询,书签管理会自动发生,因此您可以相信一个会话内的查询是因果链接的。

session := driver.NewSession(ctx, neo4j.SessionConfig{DatabaseName: "neo4j"})
defer session.Close(ctx)
session.ExecuteWrite(ctx,
    func(tx neo4j.ManagedTransaction) (any, error) {
        return tx.Run(ctx, "<QUERY 1>", nil)
})
session.ExecuteWrite(ctx,
    func(tx neo4j.ManagedTransaction) (any, error) {
        return tx.Run(ctx, "<QUERY 2>", nil)  // can read QUERY 1
})
session.ExecuteWrite(ctx,
    func(tx neo4j.ManagedTransaction) (any, error) {
        return tx.Run(ctx, "<QUERY 3>", nil)  // can read QUERY 1 and 2
})

跨多个会话的书签

如果您的应用程序使用多个会话,则可能需要确保一个会话已完成其所有事务,然后才能允许另一个会话运行其查询。

在下面的示例中,sessionAsessionB 允许并发运行,而 sessionC 则等待其结果传播。这保证了 sessionC 要操作的 Person 节点确实存在。

使用书签协调多个会话
package main

import (
    "fmt"
    "context"
    "github.com/neo4j/neo4j-go-driver/v5/neo4j"
)

func main() {
    ctx := context.Background()

    // Connection to database
    dbUri := "<URI for Neo4j database>"
    dbUser := "<Username>"
    dbPassword := "<Password>"
    driver, err := neo4j.NewDriverWithContext(
        dbUri,
        neo4j.BasicAuth(dbUser, dbPassword, ""))
    if err != nil {
        panic(err)
    }
    defer driver.Close(ctx)
    err = driver.VerifyConnectivity(ctx)
    if err != nil {
        panic(err)
    }

    // Bookmarks holder
    var savedBookmarks neo4j.Bookmarks

    // All function calls below may return errors,
    // we don't catch them here for simplicity.

    // Create the first person and employment relationship
    sessionA := driver.NewSession(ctx, neo4j.SessionConfig{DatabaseName: "neo4j"})
    createPerson(ctx, sessionA, "Alice")
    employ(ctx, sessionA, "Alice", "Wayne Enterprises")
    savedBookmarks = neo4j.CombineBookmarks(savedBookmarks, sessionA.LastBookmarks())  (1)
    sessionA.Close(ctx)

    // Create the second person and employment relationship
    sessionB := driver.NewSession(ctx, neo4j.SessionConfig{DatabaseName: "neo4j"})
    createPerson(ctx, sessionB, "Bob")
    employ(ctx, sessionB, "Bob", "LexCorp")
    savedBookmarks = neo4j.CombineBookmarks(savedBookmarks, sessionB.LastBookmarks())  (1)
    sessionB.Close(ctx)

    // Create a friendship between the two people created above
    sessionC := driver.NewSession(ctx, neo4j.SessionConfig{
        DatabaseName: "neo4j",
        Bookmarks: savedBookmarks,  (2)
    })
    createFriendship(ctx, sessionC, "Alice", "Bob")
    printFriendships(ctx, sessionC)
}

// Create a Person node
func createPerson(ctx context.Context, session neo4j.SessionWithContext, name string) (any, error) {
    return session.ExecuteWrite(ctx,
        func(tx neo4j.ManagedTransaction) (any, error) {
            return tx.Run(ctx,
                "MERGE (:Person {name: $name})",
                map[string]any{"name": name})
        })
}

// Create an employment relationship to a pre-existing company node
// This relies on the person first having been created
func employ(ctx context.Context, session neo4j.SessionWithContext, personName string, companyName string) (any, error) {
    return session.ExecuteWrite(ctx,
        func(tx neo4j.ManagedTransaction) (any, error) {
            return session.Run(ctx, `
                MATCH (person:Person {name: $person_name})
                MATCH (company:Company {name: $company_name})
                MERGE (person)-[:WORKS_FOR]->(company)
                `, map[string]any{
                    "personName": personName,
                    "companyName": companyName,
                })
        })
}

// Create a friendship between two people
func createFriendship(ctx context.Context, session neo4j.SessionWithContext, nameA string, nameB string) (any, error) {
    return session.ExecuteWrite(ctx,
        func(tx neo4j.ManagedTransaction) (any, error) {
            return session.Run(ctx, `
                MATCH (a:Person {name: $nameA})
                MATCH (b:Person {name: $nameB})
                MERGE (a)-[:KNOWS]->(b)
                `, map[string]any{
                    "nameA": nameA,
                    "nameB": nameB,
                })
        })
}

// Retrieve and display all friendships
func printFriendships(ctx context.Context, session neo4j.SessionWithContext) (any, error) {
    return session.ExecuteRead(ctx,
        func(tx neo4j.ManagedTransaction) (any, error) {
            result, err := session.Run(ctx,
                "MATCH (a)-[:KNOWS]->(b) RETURN a.name, b.name",
                nil)
            if err != nil {
                return nil, err
            }
            records, _ := result.Collect(ctx)
            for _, record := range records {
                nameA, _ := record.Get("a.name")
                nameB, _ := record.Get("b.name")
                fmt.Println(nameA, "knows", nameB)
            }
            return nil, nil
        })
}
1 使用 SessionWithContext.LastBookmarks()neo4j.CombineBookmarks() 收集和组合来自不同会话的书签,并将它们存储在Bookmarks 对象中。
2 使用它们使用 Bookmarks 配置参数初始化另一个会话。

driver passing bookmarks

书签的使用可能会对性能产生负面影响,因为所有查询都必须等待最新更改传播到整个集群。对于简单的用例,请尝试将查询分组到单个事务中,或单个会话中。

混合 ExecuteQuery() 和会话

为了确保部分使用 ExecuteQuery() 和部分使用会话执行的事务之间的因果一致性,您可以在创建会话时使用参数 BookmarkManager,将其设置为 driver.ExecuteQueryBookmarkManager()。由于这是 ExecuteQuery() 调用的默认书签管理器,因此这将确保所有工作都在相同的书签管理器下执行,从而确保因果一致性。

neo4j.ExecuteQuery(ctx, driver, "<QUERY 1>", nil,
    neo4j.EagerResultTransformer,
    neo4j.ExecuteQueryWithDatabase("neo4j"))

session := driver.NewSession(ctx, neo4j.SessionConfig{
    DatabaseName: "neo4j",
    BookmarkManager: driver.ExecuteQueryBookmarkManager(),
})
// every query inside this session will be causally chained
// (i.e., can read what was written by <QUERY 1>)
session.ExecuteWrite(ctx,
    func(tx neo4j.ManagedTransaction) (any, error) {
        return tx.Run(ctx, "<QUERY 2>", nil)
})
session.Close(ctx)

// subsequent ExecuteQuery calls will be causally chained
// (i.e., can read what was written by <QUERY 2>)
neo4j.ExecuteQuery(ctx, driver, "<QUERY 3>", nil,
    neo4j.EagerResultTransformer,
    neo4j.ExecuteQueryWithDatabase("neo4j"))

词汇表

LTS

长期支持版本是保证支持若干年的版本。Neo4j 4.4 是 LTS,Neo4j 5 也将有一个 LTS 版本。

Aura

Aura 是 Neo4j 的完全托管云服务。它提供免费和付费计划。

Cypher

Cypher 是 Neo4j 的图查询语言,可让您从数据库中检索数据。它类似于 SQL,但适用于图。

APOC

Cypher 上的强大过程 (APOC) 是一个(许多)函数库,这些函数本身无法用 Cypher 轻松表达。

Bolt

Bolt 是 Neo4j 实例和驱动程序之间交互使用的协议。默认情况下,它侦听端口 7687。

ACID

原子性、一致性、隔离性和持久性 (ACID) 是保证数据库事务可靠处理的属性。符合 ACID 的 DBMS 确保数据库中的数据即使在发生故障时也能保持准确和一致。

最终一致性

如果数据库保证所有集群成员将在某个时间点存储数据的最新版本,则该数据库最终是一致的。

因果一致性

如果读写查询按相同的顺序被集群的每个成员看到,则数据库是因果一致的。这比最终一致性更强。

NULL

空标记不是一种类型,而是值不存在的占位符。有关更多信息,请参阅Cypher → 使用 null

事务

事务是工作单元,要么完全提交,要么在失败时回滚。例如银行转账:它涉及多个步骤,但它们必须全部成功或被撤销,以避免从一个账户中扣除资金但没有添加到另一个账户中。

背压

背压是阻碍数据流动的力量。它确保客户端不会被数据淹没,超过其处理能力。

事务函数

事务函数是由 ExecuteReadExecuteWrite 调用执行的回调。如果服务器发生故障,驱动程序会自动重新执行回调。

DriverWithContext

DriverWithContext 对象包含建立与 Neo4j 数据库连接所需的详细信息。