性能建议

始终指定目标数据库

在所有查询中指定目标数据库,可以使用 ExecuteQueryWithDatabase() 配置回调在 ExecuteQuery() 或使用 DatabaseName 配置参数在创建新会话时。如果未提供数据库,驱动程序必须向服务器发送额外的请求才能确定默认数据库。单个查询的开销很小,但在数百个查询中会变得很大。

良好做法

result, err := neo4j.ExecuteQuery(ctx, driver, "<QUERY>", nil,
    neo4j.EagerResultTransformer,
    neo4j.ExecuteQueryWithDatabase("neo4j"))
session := driver.NewSession(ctx, neo4j.SessionConfig{
    DatabaseName: "neo4j",
})

不良做法

result, err := neo4j.ExecuteQuery(ctx, driver, "<QUERY>", nil,
    neo4j.EagerResultTransformer)
session := driver.NewSession(ctx, neo4j.SessionConfig{})

了解事务的成本

通过 ExecuteQuery() 或通过 .ExecuteRead/Write() 提交查询时,服务器会自动将它们包装到 事务 中。此行为确保数据库始终处于一致状态,无论事务执行期间发生什么(电源故障、软件崩溃等)。

在多个查询周围创建安全的执行上下文会产生开销,而如果驱动程序只是将查询发送到服务器并希望它们能够顺利执行,则不会产生开销。开销很小,但随着查询数量的增加而增加。因此,如果您的用例更重视吞吐量而不是数据完整性,则可以通过在单个(自动提交)事务中运行所有查询来提高性能。您可以通过创建一个会话并使用 Session.Run() 来运行所需的任何数量的查询来实现这一点。

优先考虑吞吐量而不是数据完整性
session := driver.NewSession(ctx, neo4j.SessionConfig{DatabaseName: "neo4j"})
defer session.Close(ctx)
for i := 0; i < 10000; i++ {
    session.Run(ctx, "<QUERY>", nil)
}
优先考虑数据完整性而不是吞吐量
for i := 0; i < 10000; i++ {
    neo4j.ExecuteQuery(ctx, driver, "<QUERY>", nil, neo4j.EagerResultTransformer)
    // or session.executeRead/Write() calls
}

不要一次性获取大型结果集

提交可能会生成大量记录的查询时,不要一次性检索所有记录。Neo4j 服务器可以分批检索记录并将它们流式传输到驱动程序,这些记录在可用时变得可用。延迟加载结果可以分散网络流量和内存使用量。

为了方便起见,.ExecuteQuery() 始终一次性检索所有结果记录(这就是 EagerResult 中的 Eager 的含义)。要延迟加载结果,您必须使用 .ExecuteRead/Write()(或其他形式的手动处理 事务)并且不要对结果调用 .Collect(ctx);而是对其进行迭代。

示例 1. 比较急切加载和延迟加载
急切加载 延迟加载
  • 服务器必须从存储中读取所有 250 条记录,然后才能将第一条记录发送到驱动程序(即,客户端需要更长时间才能接收第一条记录)。

  • 在任何记录可供应用程序使用之前,驱动程序必须接收所有 250 条记录。

  • 客户端必须在内存中保存所有 250 条记录。

  • 服务器读取第一条记录并将其发送到驱动程序。

  • 应用程序可以在第一条记录传输后立即处理记录。

  • 对剩余记录的等待时间和资源消耗被延迟到应用程序请求更多记录时。

  • 服务器的获取时间可用于客户端处理。

  • 资源消耗受驱动程序的获取大小限制。

急切加载和延迟加载的时间和内存比较
package main

import (
    "context"
    "time"
    "fmt"
    "github.com/neo4j/neo4j-go-driver/v5/neo4j"
)

// Returns 250 records, each with properties
// - `output` (an expensive computation, to slow down retrieval)
// - `dummyData` (a list of 10000 ints, about 8 KB).
var slowQuery = `
UNWIND range(1, 250) AS s
RETURN reduce(s=s, x in range(1,1000000) | s + sin(toFloat(x))+cos(toFloat(x))) AS output,
range(1, 10000) AS dummyData
`
// Delay for each processed record
var sleepTime = "0.5s"

func main() {
    ctx := context.Background()
    dbUri := "<URI for Neo4j database>"
    dbUser := "<Username>"
    dbPassword := "<Password>"
    driver, err := neo4j.NewDriverWithContext(
        dbUri,
        neo4j.BasicAuth(dbUser, dbPassword, ""))
    if err != nil {
        panic(err)
    }
    defer driver.Close(ctx)

    err = driver.VerifyConnectivity(ctx)
    if err != nil {
        panic(err)
    }

    log("LAZY LOADING (executeRead)")
    lazyLoading(ctx, driver)

    log("EAGER LOADING (executeQuery)")
    eagerLoading(ctx, driver)
}

func lazyLoading(ctx context.Context, driver neo4j.DriverWithContext) {
    defer timer("lazyLoading")()

    sleepTimeParsed, err := time.ParseDuration(sleepTime)
    if err != nil {
        panic(err)
    }

    session := driver.NewSession(ctx, neo4j.SessionConfig{DatabaseName: "neo4j"})
    defer session.Close(ctx)
    session.ExecuteRead(ctx,
        func(tx neo4j.ManagedTransaction) (any, error) {
            log("Submit query")
            result, err := tx.Run(ctx, slowQuery, nil)
            if err != nil {
                return nil, err
            }
            for result.Next(ctx) != false {
                record := result.Record()
                output, _ := record.Get("output")
                log(fmt.Sprintf("Processing record %v", output))
                time.Sleep(sleepTimeParsed)  // proxy for some expensive operation
            }
            return nil, nil
        })
}

func eagerLoading(ctx context.Context, driver neo4j.DriverWithContext) {
    defer timer("eagerLoading")()

    log("Submit query")
    result, err := neo4j.ExecuteQuery(ctx, driver,
        slowQuery,
        nil,
        neo4j.EagerResultTransformer,
        neo4j.ExecuteQueryWithDatabase("neo4j"))
    if err != nil {
        panic(err)
    }

    sleepTimeParsed, err := time.ParseDuration(sleepTime)
    if err != nil {
        panic(err)
    }

    // Loop through results and do something with them
    for _, record := range result.Records {
        output, _ := record.Get("output")
        log(fmt.Sprintf("Processing record %v", output))
        time.Sleep(sleepTimeParsed)  // proxy for some expensive operation
    }
}

func log(msg string) {
    fmt.Println("[", time.Now().Unix(), "] ", msg)
}

func timer(name string) func() {
    start := time.Now()
    return func() {
        fmt.Printf("-- %s took %v --\n\n", name, time.Since(start))
    }
}
输出
[ 1718802595 ]  LAZY LOADING (executeRead)
[ 1718802595 ]  Submit query
[ 1718802595 ]  Processing record 0.5309371354666308  (1)
[ 1718802595 ]  Processing record 1.5309371354662915
[ 1718802596 ]  Processing record 2.5309371354663197
...
[ 1718802720 ]  Processing record 249.53093713547042
-- lazyLoading took 2m5.467064085s --

[ 1718802720 ]  EAGER LOADING (executeQuery)
[ 1718802720 ]  Submit query
[ 1718802744 ]  Processing record 0.5309371354666308  (2)
[ 1718802744 ]  Processing record 1.5309371354662915
[ 1718802745 ]  Processing record 2.5309371354663197
...
[ 1718802869 ]  Processing record 249.53093713547042
-- eagerLoading took 2m29.113482541s --  (3)
1 使用延迟加载,第一条记录会很快可用。
2 使用急切加载,第一条记录在查询提交后大约 25 秒可用(即服务器检索完所有 250 条记录后)。
3 使用延迟加载,总运行时间更短,因为在客户端处理记录时,服务器可以获取下一条记录。使用延迟加载,客户端也可以在满足某些条件后停止请求记录(通过对 Result 调用 .Consume(ctx)),从而节省时间和资源。

驱动程序的 获取大小 会影响延迟加载的行为。它指示服务器流式传输数量等于获取大小的记录,然后等待客户端赶上后再检索和发送更多记录。

获取大小允许在客户端侧限制内存消耗。但是,它并不总是限制服务器端的内存消耗:这取决于查询。例如,具有 ORDER BY 的查询需要将整个结果集加载到内存中以进行排序,然后才能将记录流式传输到客户端。

获取大小越小,客户端和服务器之间必须交换的消息就越多。特别是如果服务器的延迟很高,较小的获取大小可能会降低性能。

将读取查询路由到集群读取器

在集群中,将读取查询路由到 辅助节点。您可以通过以下方式执行此操作:

良好做法

result, err := neo4j.ExecuteQuery(ctx, driver,
    "MATCH (p:Person) RETURN p", nil, neo4j.EagerResultTransformer,
    neo4j.ExecuteQueryWithDatabase("neo4j"),
    neo4j.ExecuteQueryWithReadersRouting())
session := driver.NewSession(ctx, neo4j.SessionConfig{DatabaseName: "neo4j"})
defer session.Close(ctx)
result, err := session.ExecuteRead(ctx,
    func(tx neo4j.ManagedTransaction) (any, error) {
        return tx.Run(ctx, "MATCH (p:Person) RETURN p", nil)
    })

不良做法

result, err := neo4j.ExecuteQuery(ctx, driver,
    "MATCH (p:Person) RETURN p", nil, neo4j.EagerResultTransformer,
    neo4j.ExecuteQueryWithDatabase("neo4j"))
    // defaults to routing = writers
session := driver.NewSession(ctx, neo4j.SessionConfig{DatabaseName: "neo4j"})
defer session.Close(ctx)
result, err := session.ExecuteWrite(ctx,  // don't ask to write on a read-only operation
    func(tx neo4j.ManagedTransaction) (any, error) {
        return tx.Run(ctx, "MATCH (p:Person) RETURN p", nil)
    })

创建索引

为经常用作筛选条件的属性创建索引。例如,如果您经常根据 name 属性查找 Person 节点,那么在 Person.name 上创建索引将非常有用。您可以使用 CREATE INDEX Cypher 子句为节点和关系创建索引。

// Create an index on Person.name
neo4j.ExecuteQuery(ctx, driver,
    "CREATE INDEX personName FOR (n:Person) ON (n.name)",
    nil, neo4j.EagerResultTransformer,
    neo4j.ExecuteQueryWithDatabase("neo4j"))

有关更多信息,请参见 用于搜索性能的索引

分析查询

分析您的查询 以查找性能可以提高的查询。您可以通过在查询前面加上 PROFILE 来分析查询。服务器输出可通过 ResultSummary 对象上的 .Profile() 方法获得。

result, _ := neo4j.ExecuteQuery(ctx, driver,
    "PROFILE MATCH (p {name: $name}) RETURN p",
    map[string]any{
        "name": "Alice",
    },
    neo4j.EagerResultTransformer,
    neo4j.ExecuteQueryWithDatabase("neo4j"))
fmt.Println(result.Summary.Profile().Arguments()["string-representation"])
/*
Planner COST
Runtime PIPELINED
Runtime version 5.0
Batch size 128

+-----------------+----------------+----------------+------+---------+----------------+------------------------+-----------+---------------------+
| Operator        | Details        | Estimated Rows | Rows | DB Hits | Memory (Bytes) | Page Cache Hits/Misses | Time (ms) | Pipeline            |
+-----------------+----------------+----------------+------+---------+----------------+------------------------+-----------+---------------------+
| +ProduceResults | p              |              1 |    1 |       3 |                |                        |           |                     |
| |               +----------------+----------------+------+---------+----------------+                        |           |                     |
| +Filter         | p.name = $name |              1 |    1 |       4 |                |                        |           |                     |
| |               +----------------+----------------+------+---------+----------------+                        |           |                     |
| +AllNodesScan   | p              |             10 |    4 |       5 |            120 |                 9160/0 |   108.923 | Fused in Pipeline 0 |
+-----------------+----------------+----------------+------+---------+----------------+------------------------+-----------+---------------------+

Total database accesses: 12, total allocated memory: 184
*/

如果某些查询速度非常慢,以至于您甚至无法在合理的时间内运行它们,则可以在查询前面加上 EXPLAIN 而不是 PROFILE。这将返回服务器用于运行查询的计划,但不执行它。服务器输出可通过 ResultSummary 对象上的 .Plan() 方法获得。

result, _ := neo4j.ExecuteQuery(ctx, driver,
    "EXPLAIN MATCH (p {name: $name}) RETURN p",
    map[string]any{
        "name": "Alice",
    },
    neo4j.EagerResultTransformer,
    neo4j.ExecuteQueryWithDatabase("neo4j"))
fmt.Println(result.Summary.Plan().Arguments()["string-representation"])
/*
Planner COST
Runtime PIPELINED
Runtime version 5.0
Batch size 128

+-----------------+----------------+----------------+---------------------+
| Operator        | Details        | Estimated Rows | Pipeline            |
+-----------------+----------------+----------------+---------------------+
| +ProduceResults | p              |              1 |                     |
| |               +----------------+----------------+                     |
| +Filter         | p.name = $name |              1 |                     |
| |               +----------------+----------------+                     |
| +AllNodesScan   | p              |             10 | Fused in Pipeline 0 |
+-----------------+----------------+----------------+---------------------+

Total database accesses: ?
*/

指定节点标签

在所有查询中指定节点标签。这使查询规划器能够更有效地工作,并利用可用的索引。要了解如何组合标签,请参见 Cypher → 标签表达式

良好做法

result, err := neo4j.ExecuteQuery(ctx, driver,
    "MATCH (p:Person|Animal {name: $name}) RETURN p",
    map[string]any{
        "name": "Alice",
    }, neo4j.EagerResultTransformer,
    neo4j.ExecuteQueryWithDatabase("neo4j"))
session := driver.NewSession(ctx, neo4j.SessionConfig{DatabaseName: "neo4j"})
defer session.Close(ctx)
result, err := session.Run(ctx,
    "MATCH (p:Person|Animal {name: $name}) RETURN p",
    map[string]any{
        "name": "Alice",
    })

不良做法

result, err := neo4j.ExecuteQuery(ctx, driver,
    "MATCH (p {name: $name}) RETURN p",
    map[string]any{
        "name": "Alice",
    }, neo4j.EagerResultTransformer,
    neo4j.ExecuteQueryWithDatabase("neo4j"))
session := driver.NewSession(ctx, neo4j.SessionConfig{DatabaseName: "neo4j"})
defer session.Close(ctx)
result, err := session.Run(ctx,
    "MATCH (p {name: $name}) RETURN p",
    map[string]any{
        "name": "Alice",
    })

批量数据创建

在使用 WITHUNWIND Cypher 子句创建大量记录时,批量查询。

良好做法

numbers := make([]int, 10000)
for i := range numbers { numbers[i] = i }
neo4j.ExecuteQuery(ctx, driver, `
    WITH $numbers AS batch
    UNWIND batch AS value
    MERGE (n:Number)
    SET n.value = value
    `, map[string]any{
        "numbers": numbers,
    }, neo4j.EagerResultTransformer,
    neo4j.ExecuteQueryWithDatabase("neo4j"))

不良做法

for i := 0; i < 10000; i++ {
    neo4j.ExecuteQuery(ctx, driver,
        "MERGE (:Number {value: $value})",
        map[string]any{
            "value": i,
        }, neo4j.EagerResultTransformer,
        neo4j.ExecuteQueryWithDatabase("neo4j"))
}
将大量数据首次导入到新数据库中最有效的方法是使用 neo4j-admin database import 命令。

使用查询参数

始终使用 查询参数 而不是将值硬编码或串联到查询中。除了防止 Cypher 注入外,这还允许利用数据库查询缓存。

良好做法

result, err := neo4j.ExecuteQuery(ctx, driver,
    "MATCH (p:Person {name: $name}) RETURN p",
    map[string]any{
        "name": "Alice",
    }, neo4j.EagerResultTransformer,
    neo4j.ExecuteQueryWithDatabase("neo4j"))
session := driver.NewSession(ctx, neo4j.SessionConfig{DatabaseName: "neo4j"})
defer session.Close(ctx)
session.Run(ctx, "MATCH (p:Person {name: $name}) RETURN p", map[string]any{
    "name": "Alice",
})

不良做法

result, err := neo4j.ExecuteQuery(ctx, driver,
    "MATCH (p:Person {name: 'Alice'}) RETURN p",
    // or "MATCH (p:Person {name: '" + name + "'}) RETURN p"
    nil, neo4j.EagerResultTransformer,
    neo4j.ExecuteQueryWithDatabase("neo4j"))
session := driver.NewSession(ctx, neo4j.SessionConfig{DatabaseName: "neo4j"})
defer session.Close(ctx)
session.Run(ctx, "MATCH (p:Person {name: $name}) RETURN p", nil)
           // or "MATCH (p:Person {name: '" + name + "'}) RETURN p"

并发性

使用 并发模式。如果您在应用程序中并行化复杂且耗时的查询,这可能会对性能产生更大的影响,但如果您运行许多简单的查询,则影响不大。

仅在需要时使用 MERGE 进行创建

Cypher 子句 MERGE 方便用于数据创建,因为它允许在存在与给定模式完全相同的克隆时避免重复数据。但是,它需要数据库运行两个查询:它首先需要 MATCH 模式,然后才能 CREATE 它(如果需要)。

如果您已经知道要插入的数据是新的,请避免使用 MERGE,而是直接使用 CREATE - 这实际上将数据库查询的数量减少了一半。

筛选通知

术语表

LTS

长期支持版本是指保证支持数年之久的版本。Neo4j 4.4 是 LTS 版本,Neo4j 5 也将有一个 LTS 版本。

Aura

Aura 是 Neo4j 的全托管云服务。它提供免费和付费计划。

Cypher

Cypher 是 Neo4j 的图查询语言,它允许您从数据库中检索数据。它类似于 SQL,但适用于图。

APOC

Cypher 上的强大过程 (APOC) 是一个包含(许多)函数的库,这些函数无法轻松地在 Cypher 本身中表达。

Bolt

Bolt 是用于 Neo4j 实例和驱动程序之间交互的协议。默认情况下,它在端口 7687 上监听。

ACID

原子性、一致性、隔离性和持久性 (ACID) 是保证数据库事务可靠处理的特性。符合 ACID 的 DBMS 确保数据库中的数据即使在出现故障的情况下也能保持准确和一致。

最终一致性

如果数据库保证所有集群成员最终都会存储数据的最新版本,则该数据库是最终一致的。在某个时间点

因果一致性

如果读写查询被集群中的每个成员以相同的顺序看到,则数据库是因果一致的。这比最终一致性更强大。

NULL

空标记不是一种类型,而是表示值不存在的占位符。有关更多信息,请参见 Cypher → 使用 null

事务

事务是工作单元,可以全部提交,也可以在失败时回滚。例如,银行转账:它涉及多个步骤,但这些步骤必须全部成功或被还原,以避免从一个账户中扣款,但没有添加到另一个账户中。

背压

背压是反对数据流动的力量。它确保客户端不会被它无法处理的速度更快的数据所淹没。

事务函数

事务函数是在 ExecuteReadExecuteWrite 调用时执行的回调。如果服务器发生故障,驱动程序会自动重新执行回调。

DriverWithContext

一个 DriverWithContext 对象保存与 Neo4j 数据库建立连接所需的详细信息。