协调并行事务
ExecuteQuery()
的书签
当使用 ExecuteQuery()
查询数据库时,驱动程序会为您管理书签。在这种情况下,您保证后续查询无需采取进一步操作即可读取之前的更改。
neo4j.ExecuteQuery(ctx, driver, "<QUERY 1>", nil,
neo4j.EagerResultTransformer,
neo4j.ExecuteQueryWithDatabase("neo4j"))
// subsequent ExecuteQuery calls will be causally chained
neo4j.ExecuteQuery(ctx, driver, "<QUERY 2>", nil, // can read result of <QUERY 1>
neo4j.EagerResultTransformer,
neo4j.ExecuteQueryWithDatabase("neo4j"))
neo4j.ExecuteQuery(ctx, driver, "<QUERY 3>", nil, // can read result of <QUERY 2>
neo4j.EagerResultTransformer,
neo4j.ExecuteQueryWithDatabase("neo4j"))
要禁用书签管理和因果一致性,请在 ExecuteQuery()
调用中使用配置回调 neo4j.ExecuteQueryWithoutBookmarkManager()
。
neo4j.ExecuteQuery(
ctx, driver, "<QUERY>", nil, neo4j.EagerResultTransformer,
neo4j.ExecuteQueryWithDatabase("neo4j"),
neo4j.ExecuteQueryWithoutBookmarkManager())
单个会话中的书签
在单个会话中运行的查询会自动进行书签管理,因此您可以相信一个会话内的查询是因果链式的。
session := driver.NewSession(ctx, neo4j.SessionConfig{DatabaseName: "neo4j"})
defer session.Close(ctx)
session.ExecuteWrite(ctx,
func(tx neo4j.ManagedTransaction) (any, error) {
return tx.Run(ctx, "<QUERY 1>", nil)
})
session.ExecuteWrite(ctx,
func(tx neo4j.ManagedTransaction) (any, error) {
return tx.Run(ctx, "<QUERY 2>", nil) // can read QUERY 1
})
session.ExecuteWrite(ctx,
func(tx neo4j.ManagedTransaction) (any, error) {
return tx.Run(ctx, "<QUERY 3>", nil) // can read QUERY 1 and 2
})
跨多个会话的书签
如果您的应用程序使用多个会话,您可能需要确保一个会话已完成其所有事务后,另一个会话才被允许运行其查询。
在下面的示例中,sessionA
和 sessionB
允许并发运行,而 sessionC
则会等待它们的结果传播完毕。这保证了 sessionC
想要操作的 Person
节点确实存在。
package main
import (
"fmt"
"context"
"github.com/neo4j/neo4j-go-driver/v5/neo4j"
)
func main() {
ctx := context.Background()
// Connection to database
dbUri := "{neo4j-database-uri}"
dbUser := "{neo4j-username}"
dbPassword := "{neo4j-password}"
driver, err := neo4j.NewDriverWithContext(
dbUri,
neo4j.BasicAuth(dbUser, dbPassword, ""))
if err != nil {
panic(err)
}
defer driver.Close(ctx)
err = driver.VerifyConnectivity(ctx)
if err != nil {
panic(err)
}
// Bookmarks holder
var savedBookmarks neo4j.Bookmarks
// All function calls below may return errors,
// we don't catch them here for simplicity.
// Create the first person and employment relationship
sessionA := driver.NewSession(ctx, neo4j.SessionConfig{DatabaseName: "neo4j"})
createPerson(ctx, sessionA, "Alice")
employ(ctx, sessionA, "Alice", "Wayne Enterprises")
savedBookmarks = neo4j.CombineBookmarks(savedBookmarks, sessionA.LastBookmarks()) (1)
sessionA.Close(ctx)
// Create the second person and employment relationship
sessionB := driver.NewSession(ctx, neo4j.SessionConfig{DatabaseName: "neo4j"})
createPerson(ctx, sessionB, "Bob")
employ(ctx, sessionB, "Bob", "LexCorp")
savedBookmarks = neo4j.CombineBookmarks(savedBookmarks, sessionB.LastBookmarks()) (1)
sessionB.Close(ctx)
// Create a friendship between the two people created above
sessionC := driver.NewSession(ctx, neo4j.SessionConfig{
DatabaseName: "neo4j",
Bookmarks: savedBookmarks, (2)
})
createFriendship(ctx, sessionC, "Alice", "Bob")
printFriendships(ctx, sessionC)
}
// Create a Person node
func createPerson(ctx context.Context, session neo4j.SessionWithContext, name string) (any, error) {
return session.ExecuteWrite(ctx,
func(tx neo4j.ManagedTransaction) (any, error) {
return tx.Run(ctx,
"MERGE (:Person {name: $name})",
map[string]any{"name": name})
})
}
// Create an employment relationship to a pre-existing company node
// This relies on the person first having been created
func employ(ctx context.Context, session neo4j.SessionWithContext, personName string, companyName string) (any, error) {
return session.ExecuteWrite(ctx,
func(tx neo4j.ManagedTransaction) (any, error) {
return session.Run(ctx, `
MATCH (person:Person {name: $person_name})
MATCH (company:Company {name: $company_name})
MERGE (person)-[:WORKS_FOR]->(company)
`, map[string]any{
"personName": personName,
"companyName": companyName,
})
})
}
// Create a friendship between two people
func createFriendship(ctx context.Context, session neo4j.SessionWithContext, nameA string, nameB string) (any, error) {
return session.ExecuteWrite(ctx,
func(tx neo4j.ManagedTransaction) (any, error) {
return session.Run(ctx, `
MATCH (a:Person {name: $nameA})
MATCH (b:Person {name: $nameB})
MERGE (a)-[:KNOWS]->(b)
`, map[string]any{
"nameA": nameA,
"nameB": nameB,
})
})
}
// Retrieve and display all friendships
func printFriendships(ctx context.Context, session neo4j.SessionWithContext) (any, error) {
return session.ExecuteRead(ctx,
func(tx neo4j.ManagedTransaction) (any, error) {
result, err := session.Run(ctx,
"MATCH (a)-[:KNOWS]->(b) RETURN a.name, b.name",
nil)
if err != nil {
return nil, err
}
records, _ := result.Collect(ctx)
for _, record := range records {
nameA, _ := record.Get("a.name")
nameB, _ := record.Get("b.name")
fmt.Println(nameA, "knows", nameB)
}
return nil, nil
})
}
1 | 使用 SessionWithContext.LastBookmarks() 和 neo4j.CombineBookmarks() 从不同会话收集并合并书签,并将它们存储在 Bookmarks 对象中。 |
2 | 使用它们通过 Bookmarks 配置参数初始化另一个会话。 |
书签的使用可能会对性能产生负面影响,因为所有查询都被迫等待最新更改在集群中传播。对于简单的用例,请尝试将查询分组到单个事务中,或单个会话中。 |
混合使用 ExecuteQuery()
和会话
为了确保部分使用 ExecuteQuery()
执行、部分使用会话执行的事务之间的因果一致性,您可以在会话创建时使用参数 BookmarkManager
,并将其设置为 driver.ExecuteQueryBookmarkManager()
。由于这是 ExecuteQuery()
调用的默认书签管理器,这将确保所有工作都在同一个书签管理器下执行,从而实现因果一致。
neo4j.ExecuteQuery(ctx, driver, "<QUERY 1>", nil,
neo4j.EagerResultTransformer,
neo4j.ExecuteQueryWithDatabase("neo4j"))
session := driver.NewSession(ctx, neo4j.SessionConfig{
DatabaseName: "neo4j",
BookmarkManager: driver.ExecuteQueryBookmarkManager(),
})
// every query inside this session will be causally chained
// (i.e., can read what was written by <QUERY 1>)
session.ExecuteWrite(ctx,
func(tx neo4j.ManagedTransaction) (any, error) {
return tx.Run(ctx, "<QUERY 2>", nil)
})
session.Close(ctx)
// subsequent ExecuteQuery calls will be causally chained
// (i.e., can read what was written by <QUERY 2>)
neo4j.ExecuteQuery(ctx, driver, "<QUERY 3>", nil,
neo4j.EagerResultTransformer,
neo4j.ExecuteQueryWithDatabase("neo4j"))
术语表
- LTS
-
长期支持 (Long Term Support) 版本是指保证在数年内获得支持的版本。Neo4j 4.4 是 LTS 版本,Neo4j 5 也将拥有一个 LTS 版本。
- Aura
-
Aura 是 Neo4j 的全托管云服务。它提供免费和付费计划。
- Cypher
-
Cypher 是 Neo4j 的图查询语言,它允许您从数据库中检索数据。它类似于 SQL,但专用于图。
- APOC
-
Awesome Procedures On Cypher (APOC) 是一个包含许多难以在 Cypher 本身中轻松表达的函数的库。
- Bolt
-
Bolt 是 Neo4j 实例和驱动程序之间交互所使用的协议。它默认监听端口 7687。
- ACID
-
原子性 (Atomicity)、一致性 (Consistency)、隔离性 (Isolation)、持久性 (Durability) (ACID) 是保证数据库事务可靠处理的属性。符合 ACID 的 DBMS 确保数据库中的数据即使在发生故障时也能保持准确和一致。
- 最终一致性
-
如果数据库保证所有集群成员都将在某个时间点存储数据的最新版本,则该数据库是最终一致的。
- 因果一致性
-
如果集群的每个成员都以相同的顺序看到读写查询,则数据库是因果一致的。这比最终一致性更强。
- NULL
-
null 标记不是一种类型,而是表示值缺失的占位符。有关更多信息,请参阅 Cypher → 使用
null
。 - 事务
-
事务是工作的一个单元,它要么整体被提交,要么在失败时被回滚。一个例子是银行转账:它涉及多个步骤,但所有步骤都必须全部成功或被撤销,以避免钱从一个账户中扣除但未添加到另一个账户的情况。
- 反压
-
反压是与数据流相反的作用力。它确保客户端不会被超出其处理能力的数据所淹没。
- 事务函数
-
事务函数是由
ExecuteRead
或ExecuteWrite
调用执行的回调。在服务器故障时,驱动程序会自动重新执行该回调。 - DriverWithContext
-
一个
DriverWithContext
对象包含与 Neo4j 数据库建立连接所需的详细信息。