运行并发事务

您可以使用 Goroutine 和通道 来运行并发查询,或将查询结果的处理委托给多个线程。以下示例还使用 Go sync 来协调不同的协程。如果您不熟悉 Go 中的并发,请查看 Go 编程语言 → Go 并发模式:管道和取消

如果您需要在不同事务之间保持因果一致性,请使用书签

并发处理查询结果集(使用会话)

以下示例展示了如何将查询结果流式传输到通道,并由多个消费者并发处理其记录。

package main

import (
    "fmt"
    "context"
    "time"
    "sync"
    "github.com/neo4j/neo4j-go-driver/v5/neo4j"
)

func main() {
    ctx := context.Background()

    // Connection to database
    dbUri := "{neo4j-database-uri}"
    dbUser := "{neo4j-username}"
    dbPassword := "{neo4j-password}"
    driver, err := neo4j.NewDriverWithContext(
        dbUri,
        neo4j.BasicAuth(dbUser, dbPassword, ""))
    if err != nil {
        panic(err)
    }
    defer driver.Close(ctx)
    err = driver.VerifyConnectivity(ctx)
    if err != nil {
        panic(err)
    }

    // Run a query and get results in a channel
    recordsC := queryToChannel(ctx, driver)  (1)

    // Spawn some consumers that will process records
    // They communicate back on the log channel
    // WaitGroup allows to keep track of progress and close channel when all are done
    log := make(chan string)  (4)
    wg := &sync.WaitGroup{}  (5)
    for i := 1; i < 10; i++ {  // i starts from 1 because 0th receiver would process too fast
        wg.Add(1)
        go consumer(wg, recordsC, log, i)  (6)
    }
    // When all consumers are done, close log channel
    go func() {
        wg.Wait()
        close(log)
    }()
    // Print log as it comes
    for v := range log {
        fmt.Println(v)
    }
}

func queryToChannel(ctx context.Context, driver neo4j.DriverWithContext) chan *neo4j.Record {
    recordsC := make(chan *neo4j.Record, 10)  (2)
    session := driver.NewSession(ctx, neo4j.SessionConfig{DatabaseName: "neo4j"})
    defer session.Close(ctx)
    go session.ExecuteWrite(ctx,
        func(tx neo4j.ManagedTransaction) (any, error) {
            // Neo4j query to create and retrieve some nodes
            result, err := tx.Run(ctx, `
                UNWIND range(1,25) AS id
                MERGE (p:Person {id: id})
                RETURN p
                `, nil)
            if err != nil {
                panic(err)
            }
            // Stream results to channel as they come from the server
            for result.Next(ctx) {  (3)
                record := result.Record()
                recordsC <- record
            }
            close(recordsC)
            return nil, err
        })
    return recordsC
}

func consumer(wg *sync.WaitGroup, records <-chan *neo4j.Record, log chan string, n int) {
    defer wg.Done()  // will communicate that routine is done
    for record := range records {
        log <- fmt.Sprintf("Receiver %v processed %v", n, record)
        time.Sleep(time.Duration(n) * time.Second)  // proxy for a time-consuming processing
    }
}
1 一个 Goroutine 使用托管事务向 Neo4j 服务器运行查询。请注意,驱动程序会话是在协程内部创建的,因为会话不是线程安全的。
2 recordsC 通道是查询结果记录流式传输到的地方。来自 .ExecuteWrite() 的事务函数向其写入,而各个 consumer 则从中读取。它是带缓冲的,这样驱动程序就不会以比消费者处理速度更快的速度检索记录。
3 来自服务器的每个结果记录都会通过 recordsC 通道发送。只要有记录需要处理,流式传输就会继续,之后通道关闭,协程退出。
4 log 通道是消费者进行通信的地方。
5 需要一个 sync.WaitGroup 来知道所有消费者何时完成,从而可以关闭 log 通道。
6 多个 consumer 在独立的 Goroutine 中启动。每个消费者从 recordsC 通道读取并处理记录。每个消费者都通过一个睡眠计时器模拟一个耗时操作。

并发运行多个查询(使用 ExecuteQuery()

以下示例展示了如何并发运行多个查询。

package main

import (
    "fmt"
    "context"
    "sync"
    "github.com/neo4j/neo4j-go-driver/v5/neo4j"
)

func main() {
    ctx := context.Background()

    // Connection to database
    dbUri := "{neo4j-database-uri}"
    dbUser := "{neo4j-username}"
    dbPassword := "{neo4j-password}"
    driver, err := neo4j.NewDriverWithContext(
        dbUri,
        neo4j.BasicAuth(dbUser, dbPassword, ""))
    if err != nil {
        panic(err)
    }
    defer driver.Close(ctx)
    err = driver.VerifyConnectivity(ctx)
    if err != nil {
        panic(err)
    }

    log := make(chan string)  (1)
    wg := &sync.WaitGroup{}  (2)
    // Spawn 10 concurrent queries
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go runQuery(wg, ctx, driver, log)  (3)
    }
    // Wait for all runner routines to be done before closing log
    go func() {
        wg.Wait()
        close(log)
    }()
    // Print log
    for msg := range log {
        fmt.Println(msg)
    }
}

// Run Neo4j query with random sleep time, returning the sleep time in ms
func runQuery(wg *sync.WaitGroup, ctx context.Context, driver neo4j.DriverWithContext, log chan string) {
    defer wg.Done()  // will communicate that routine is done
    result, err := neo4j.ExecuteQuery(ctx, driver, `
        WITH round(rand()*2000) AS waitTime
        CALL apoc.util.sleep(toInteger(waitTime)) RETURN waitTime AS time
        `, nil, neo4j.EagerResultTransformer,
        neo4j.ExecuteQueryWithDatabase("neo4j"))
    if err != nil {
        log <- fmt.Sprintf("ERROR: %v", err)
    } else {
        neo, _ := result.Records[0].Get("time")
        log <- fmt.Sprintf("Query returned %v", neo)
    }
}
1 log 通道是所有查询协程进行通信的地方。
2 需要一个 sync.WaitGroup 来知道所有查询协程何时完成,从而可以关闭 log 通道。
3 十个不同的查询正在运行,每个查询都在自己的 Go 协程中。它们独立并行地运行,并向共享的 log 通道报告。

词汇表

LTS

长期支持版本是指保证在数年内获得支持的版本。Neo4j 4.4 是 LTS 版本,Neo4j 5 也将有一个 LTS 版本。

Aura

Aura 是 Neo4j 的全托管云服务。它提供免费和付费计划。

Cypher

Cypher 是 Neo4j 的图查询语言,可让您从数据库中检索数据。它类似于 SQL,但专用于图。

APOC

Cypher 上的优秀过程 (Awesome Procedures On Cypher, APOC) 是一个库,包含许多无法在 Cypher 本身轻松表达的函数。

Bolt

Bolt 是用于 Neo4j 实例和驱动程序之间交互的协议。它默认监听端口 7687。

ACID

原子性(Atomicity)、一致性(Consistency)、隔离性(Isolation)、持久性(Durability)(ACID)是保证数据库事务可靠处理的属性。符合 ACID 的数据库管理系统可确保数据库中的数据在发生故障时仍保持准确和一致。

最终一致性

如果数据库保证所有集群成员在某个时间点都会存储数据的最新版本,那么该数据库就是最终一致的。

因果一致性

如果集群的每个成员都以相同的顺序看到读写查询,那么数据库就是因果一致的。这比最终一致性更强。

NULL

null 标记不是一种类型,而是表示值缺失的占位符。有关更多信息,请参阅Cypher → 使用 null

事务

事务是一个工作单元,它要么整体提交,要么在失败时回滚。例如银行转账:它涉及多个步骤,但所有步骤都必须成功或被撤销,以避免钱从一个账户中扣除但未添加到另一个账户。

背压

背压是阻碍数据流动的力。它确保客户端不会被超出其处理能力的数据量所淹没。

事务函数

事务函数是由 ExecuteReadExecuteWrite 调用执行的回调。在服务器故障的情况下,驱动程序会自动重新执行该回调。

DriverWithContext

一个 DriverWithContext 对象包含与 Neo4j 数据库建立连接所需的详细信息。

© . All rights reserved.