运行并发事务

您可以利用 Goroutines 和通道 来运行并发查询,或者将查询结果的处理委托给多个线程。下面的示例也使用 Go sync 来协调不同的例程。如果您不熟悉 Go 中的并发,请查看 Go 编程语言 → Go 并发模式:管道和取消

如果您需要跨不同事务的因果一致性,请使用 书签

并发处理查询结果集(使用会话)

以下示例演示如何将查询结果流式传输到通道,并让多个消费者并发处理其记录。

package main

import (
    "fmt"
    "context"
    "time"
    "sync"
    "github.com/neo4j/neo4j-go-driver/v5/neo4j"
)

func main() {
    ctx := context.Background()

    // Connection to database
    dbUri := "<URI for Neo4j database>"
    dbUser := "<Username>"
    dbPassword := "<Password>"
    driver, err := neo4j.NewDriverWithContext(
        dbUri,
        neo4j.BasicAuth(dbUser, dbPassword, ""))
    if err != nil {
        panic(err)
    }
    defer driver.Close(ctx)
    err = driver.VerifyConnectivity(ctx)
    if err != nil {
        panic(err)
    }

    // Run a query and get results in a channel
    recordsC := queryToChannel(ctx, driver)  (1)

    // Spawn some consumers that will process records
    // They communicate back on the log channel
    // WaitGroup allows to keep track of progress and close channel when all are done
    log := make(chan string)  (4)
    wg := &sync.WaitGroup{}  (5)
    for i := 1; i < 10; i++ {  // i starts from 1 because 0th receiver would process too fast
        wg.Add(1)
        go consumer(wg, recordsC, log, i)  (6)
    }
    // When all consumers are done, close log channel
    go func() {
        wg.Wait()
        close(log)
    }()
    // Print log as it comes
    for v := range log {
        fmt.Println(v)
    }
}

func queryToChannel(ctx context.Context, driver neo4j.DriverWithContext) chan *neo4j.Record {
    recordsC := make(chan *neo4j.Record, 10)  (2)
    session := driver.NewSession(ctx, neo4j.SessionConfig{DatabaseName: "neo4j"})
    defer session.Close(ctx)
    go session.ExecuteWrite(ctx,
        func(tx neo4j.ManagedTransaction) (any, error) {
            // Neo4j query to create and retrieve some nodes
            result, err := tx.Run(ctx, `
                UNWIND range(1,25) AS id
                MERGE (p:Person {id: id})
                RETURN p
                `, nil)
            if err != nil {
                panic(err)
            }
            // Stream results to channel as they come from the server
            for result.Next(ctx) {  (3)
                record := result.Record()
                recordsC <- record
            }
            close(recordsC)
            return nil, err
        })
    return recordsC
}

func consumer(wg *sync.WaitGroup, records <-chan *neo4j.Record, log chan string, n int) {
    defer wg.Done()  // will communicate that routine is done
    for record := range records {
        log <- fmt.Sprintf("Receiver %v processed %v", n, record)
        time.Sleep(time.Duration(n) * time.Second)  // proxy for a time-consuming processing
    }
}
1 一个 Goroutine 使用 管理的事务 将查询运行到 Neo4j 服务器。请注意,驱动程序会话是在例程内部创建的,因为会话不是线程安全的。
2 通道 recordsC 是查询结果记录流式传输到的位置。来自 .ExecuteWrite() 的事务函数写入它,而各种 consumer 从中读取。它被缓冲,以便驱动程序不会比消费者可以处理的更快地检索记录。
3 来自服务器的每个结果记录都会通过 recordsC 通道发送。只要有待处理的记录,流式传输就会继续,之后通道将关闭,例程退出。
4 通道 log 是消费者相互通信的地方。
5 需要一个 sync.WaitGroup 来了解所有消费者何时完成,从而可以关闭 log 通道。
6 在单独的 Goroutine 中启动了多个 consumer。每个消费者从 recordsC 通道读取并处理记录。每个消费者都使用睡眠计时器模拟长时间操作。

并发运行多个查询(使用 ExecuteQuery()

以下示例演示如何并发运行多个查询。

package main

import (
    "fmt"
    "context"
    "sync"
    "github.com/neo4j/neo4j-go-driver/v5/neo4j"
)

func main() {
    ctx := context.Background()

    // Connection to database
    dbUri := "<URI for Neo4j database>"
    dbUser := "<Username>"
    dbPassword := "<Password>"
    driver, err := neo4j.NewDriverWithContext(
        dbUri,
        neo4j.BasicAuth(dbUser, dbPassword, ""))
    if err != nil {
        panic(err)
    }
    defer driver.Close(ctx)
    err = driver.VerifyConnectivity(ctx)
    if err != nil {
        panic(err)
    }

    log := make(chan string)  (1)
    wg := &sync.WaitGroup{}  (2)
    // Spawn 10 concurrent queries
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go runQuery(wg, ctx, driver, log)  (3)
    }
    // Wait for all runner routines to be done before closing log
    go func() {
        wg.Wait()
        close(log)
    }()
    // Print log
    for msg := range log {
        fmt.Println(msg)
    }
}

// Run Neo4j query with random sleep time, returning the sleep time in ms
func runQuery(wg *sync.WaitGroup, ctx context.Context, driver neo4j.DriverWithContext, log chan string) {
    defer wg.Done()  // will communicate that routine is done
    result, err := neo4j.ExecuteQuery(ctx, driver, `
        WITH round(rand()*2000) AS waitTime
        CALL apoc.util.sleep(toInteger(waitTime)) RETURN waitTime AS time
        `, nil, neo4j.EagerResultTransformer,
        neo4j.ExecuteQueryWithDatabase("neo4j"))
    if err != nil {
        log <- fmt.Sprintf("ERROR: %v", err)
    } else {
        neo, _ := result.Records[0].Get("time")
        log <- fmt.Sprintf("Query returned %v", neo)
    }
}
1 log 通道是所有查询例程相互通信的地方。
2 需要一个 sync.WaitGroup 来了解所有查询例程何时完成,从而可以关闭日志通道。
3 运行十个不同的查询,每个查询都在自己的 Go 例程中运行。它们独立且并发地运行,并向共享的 log 通道报告。

词汇表

LTS

长期支持版本是指保证支持若干年的版本。Neo4j 4.4 是 LTS,Neo4j 5 也将有 LTS 版本。

Aura

Aura 是 Neo4j 的完全托管的云服务。它提供免费和付费计划。

Cypher

Cypher 是 Neo4j 的图查询语言,它允许您从数据库中检索数据。它类似于 SQL,但适用于图。

APOC

Cypher 上的强大过程 (APOC) 是一个包含(许多)函数的库,这些函数无法在 Cypher 本身中轻松表达。

Bolt

Bolt 是用于 Neo4j 实例和驱动程序之间交互的协议。默认情况下,它监听端口 7687。

ACID

原子性、一致性、隔离性和持久性 (ACID) 是保证数据库事务可靠处理的属性。符合 ACID 的 DBMS 确保数据库中的数据即使在发生故障时也能保持准确和一致。

最终一致性

如果数据库保证所有集群成员在某个时间点存储数据的最新版本,那么它就是最终一致的。

因果一致性

如果数据库保证每个集群成员以相同的顺序看到读写查询,那么它就是因果一致的。这比最终一致性更强。

NULL

空标记不是类型,而是值不存在的占位符。有关更多信息,请参阅 Cypher → 使用 null

事务

事务是一个工作单元,它要么完全提交,要么在失败时回滚。例如,银行转账:它涉及多个步骤,但这些步骤必须全部成功才能完成或被撤回,以避免从一个帐户中扣除资金但未添加到另一个帐户中。

背压

背压是抵抗数据流的力。它确保客户端不会因数据流速过快而不堪重负。

事务函数

事务函数是在 ExecuteReadExecuteWrite 调用时执行的回调。驱动程序会自动在服务器发生故障时重新执行回调。

DriverWithContext

一个 DriverWithContext 对象包含建立与 Neo4j 数据库连接所需的信息。