运行并发事务
您可以利用 Goroutines 和通道 来运行并发查询,或者将查询结果的处理委托给多个线程。下面的示例也使用 Go sync
包 来协调不同的例程。如果您不熟悉 Go 中的并发,请查看 Go 编程语言 → Go 并发模式:管道和取消。
如果您需要跨不同事务的因果一致性,请使用 书签。
并发处理查询结果集(使用会话)
以下示例演示如何将查询结果流式传输到通道,并让多个消费者并发处理其记录。
package main
import (
"fmt"
"context"
"time"
"sync"
"github.com/neo4j/neo4j-go-driver/v5/neo4j"
)
func main() {
ctx := context.Background()
// Connection to database
dbUri := "<URI for Neo4j database>"
dbUser := "<Username>"
dbPassword := "<Password>"
driver, err := neo4j.NewDriverWithContext(
dbUri,
neo4j.BasicAuth(dbUser, dbPassword, ""))
if err != nil {
panic(err)
}
defer driver.Close(ctx)
err = driver.VerifyConnectivity(ctx)
if err != nil {
panic(err)
}
// Run a query and get results in a channel
recordsC := queryToChannel(ctx, driver) (1)
// Spawn some consumers that will process records
// They communicate back on the log channel
// WaitGroup allows to keep track of progress and close channel when all are done
log := make(chan string) (4)
wg := &sync.WaitGroup{} (5)
for i := 1; i < 10; i++ { // i starts from 1 because 0th receiver would process too fast
wg.Add(1)
go consumer(wg, recordsC, log, i) (6)
}
// When all consumers are done, close log channel
go func() {
wg.Wait()
close(log)
}()
// Print log as it comes
for v := range log {
fmt.Println(v)
}
}
func queryToChannel(ctx context.Context, driver neo4j.DriverWithContext) chan *neo4j.Record {
recordsC := make(chan *neo4j.Record, 10) (2)
session := driver.NewSession(ctx, neo4j.SessionConfig{DatabaseName: "neo4j"})
defer session.Close(ctx)
go session.ExecuteWrite(ctx,
func(tx neo4j.ManagedTransaction) (any, error) {
// Neo4j query to create and retrieve some nodes
result, err := tx.Run(ctx, `
UNWIND range(1,25) AS id
MERGE (p:Person {id: id})
RETURN p
`, nil)
if err != nil {
panic(err)
}
// Stream results to channel as they come from the server
for result.Next(ctx) { (3)
record := result.Record()
recordsC <- record
}
close(recordsC)
return nil, err
})
return recordsC
}
func consumer(wg *sync.WaitGroup, records <-chan *neo4j.Record, log chan string, n int) {
defer wg.Done() // will communicate that routine is done
for record := range records {
log <- fmt.Sprintf("Receiver %v processed %v", n, record)
time.Sleep(time.Duration(n) * time.Second) // proxy for a time-consuming processing
}
}
1 | 一个 Goroutine 使用 管理的事务 将查询运行到 Neo4j 服务器。请注意,驱动程序会话是在例程内部创建的,因为会话不是线程安全的。 |
2 | 通道 recordsC 是查询结果记录流式传输到的位置。来自 .ExecuteWrite() 的事务函数写入它,而各种 consumer 从中读取。它被缓冲,以便驱动程序不会比消费者可以处理的更快地检索记录。 |
3 | 来自服务器的每个结果记录都会通过 recordsC 通道发送。只要有待处理的记录,流式传输就会继续,之后通道将关闭,例程退出。 |
4 | 通道 log 是消费者相互通信的地方。 |
5 | 需要一个 sync.WaitGroup 来了解所有消费者何时完成,从而可以关闭 log 通道。 |
6 | 在单独的 Goroutine 中启动了多个 consumer 。每个消费者从 recordsC 通道读取并处理记录。每个消费者都使用睡眠计时器模拟长时间操作。 |
并发运行多个查询(使用 ExecuteQuery()
)
以下示例演示如何并发运行多个查询。
package main
import (
"fmt"
"context"
"sync"
"github.com/neo4j/neo4j-go-driver/v5/neo4j"
)
func main() {
ctx := context.Background()
// Connection to database
dbUri := "<URI for Neo4j database>"
dbUser := "<Username>"
dbPassword := "<Password>"
driver, err := neo4j.NewDriverWithContext(
dbUri,
neo4j.BasicAuth(dbUser, dbPassword, ""))
if err != nil {
panic(err)
}
defer driver.Close(ctx)
err = driver.VerifyConnectivity(ctx)
if err != nil {
panic(err)
}
log := make(chan string) (1)
wg := &sync.WaitGroup{} (2)
// Spawn 10 concurrent queries
for i := 0; i < 10; i++ {
wg.Add(1)
go runQuery(wg, ctx, driver, log) (3)
}
// Wait for all runner routines to be done before closing log
go func() {
wg.Wait()
close(log)
}()
// Print log
for msg := range log {
fmt.Println(msg)
}
}
// Run Neo4j query with random sleep time, returning the sleep time in ms
func runQuery(wg *sync.WaitGroup, ctx context.Context, driver neo4j.DriverWithContext, log chan string) {
defer wg.Done() // will communicate that routine is done
result, err := neo4j.ExecuteQuery(ctx, driver, `
WITH round(rand()*2000) AS waitTime
CALL apoc.util.sleep(toInteger(waitTime)) RETURN waitTime AS time
`, nil, neo4j.EagerResultTransformer,
neo4j.ExecuteQueryWithDatabase("neo4j"))
if err != nil {
log <- fmt.Sprintf("ERROR: %v", err)
} else {
neo, _ := result.Records[0].Get("time")
log <- fmt.Sprintf("Query returned %v", neo)
}
}
1 | log 通道是所有查询例程相互通信的地方。 |
2 | 需要一个 sync.WaitGroup 来了解所有查询例程何时完成,从而可以关闭日志通道。 |
3 | 运行十个不同的查询,每个查询都在自己的 Go 例程中运行。它们独立且并发地运行,并向共享的 log 通道报告。 |
词汇表
- LTS
-
长期支持版本是指保证支持若干年的版本。Neo4j 4.4 是 LTS,Neo4j 5 也将有 LTS 版本。
- Aura
-
Aura 是 Neo4j 的完全托管的云服务。它提供免费和付费计划。
- Cypher
-
Cypher 是 Neo4j 的图查询语言,它允许您从数据库中检索数据。它类似于 SQL,但适用于图。
- APOC
-
Cypher 上的强大过程 (APOC) 是一个包含(许多)函数的库,这些函数无法在 Cypher 本身中轻松表达。
- Bolt
-
Bolt 是用于 Neo4j 实例和驱动程序之间交互的协议。默认情况下,它监听端口 7687。
- ACID
-
原子性、一致性、隔离性和持久性 (ACID) 是保证数据库事务可靠处理的属性。符合 ACID 的 DBMS 确保数据库中的数据即使在发生故障时也能保持准确和一致。
- 最终一致性
-
如果数据库保证所有集群成员在某个时间点存储数据的最新版本,那么它就是最终一致的。
- 因果一致性
-
如果数据库保证每个集群成员以相同的顺序看到读写查询,那么它就是因果一致的。这比最终一致性更强。
- NULL
-
空标记不是类型,而是值不存在的占位符。有关更多信息,请参阅 Cypher → 使用
null
。 - 事务
-
事务是一个工作单元,它要么完全提交,要么在失败时回滚。例如,银行转账:它涉及多个步骤,但这些步骤必须全部成功才能完成或被撤回,以避免从一个帐户中扣除资金但未添加到另一个帐户中。
- 背压
-
背压是抵抗数据流的力。它确保客户端不会因数据流速过快而不堪重负。
- 事务函数
-
事务函数是在
ExecuteRead
或ExecuteWrite
调用时执行的回调。驱动程序会自动在服务器发生故障时重新执行回调。 - DriverWithContext
-
一个
DriverWithContext
对象包含建立与 Neo4j 数据库连接所需的信息。