性能建议

始终指定目标数据库

**在所有查询中指定目标数据库**,可以使用 Driver.execute_query() 中的 database_ 参数 或使用 创建新会话时的 database 参数。如果未提供数据库,驱动程序必须向服务器发送额外的请求以确定默认数据库是什么。对于单个查询,开销很小,但在数百个查询中会变得很显著。

良好实践

driver.execute_query("<QUERY>", database_="<DB NAME>")
driver.session(database="<DB NAME>")

不良实践

driver.execute_query("<QUERY>")
driver.session()

了解事务的成本

当通过 .execute_query() 或通过 .execute_read/write() 提交查询时,服务器会自动将它们包装到 事务 中。此行为确保数据库始终处于一致状态,无论事务执行过程中发生什么情况(断电、软件崩溃等)。

在许多查询周围创建一个安全的执行上下文会产生在驱动程序只是向服务器发送查询并希望它们能够通过时不存在的开销。开销很小,但随着查询数量的增加而累积。因此,如果您的用例更重视吞吐量而不是数据完整性,则可以通过在单个(自动提交)事务中运行所有查询来获得更高的性能。您可以通过创建一个会话并使用 session.run() 运行任意数量的查询来做到这一点。

优先考虑吞吐量而非数据完整性
with driver.session(database="neo4j") as session:
    for i in range(1000):
        session.run("<QUERY>")
优先考虑数据完整性而非吞吐量
for i in range(1000):
    driver.execute_query("<QUERY>")
    # or session.execute_read/write() calls

不要一次性获取大型结果集

在提交可能产生大量记录的查询时,不要一次性检索所有记录。Neo4j 服务器可以分批检索记录,并在它们可用时将其流式传输到驱动程序。延迟加载结果可以分散网络流量和内存使用情况(客户端和服务器端)。

为了方便起见,.execute_query() 始终会一次性检索所有结果记录(这就是 EagerResultEager 的含义)。要延迟加载结果,您必须使用 .execute_read/write()(或其他形式的手动处理 事务)并且**不要**在处理结果时将 Result 对象转换为 list;而是对其进行迭代。

示例 1. 渴望加载和延迟加载之间的比较
渴望加载 延迟加载
  • 服务器必须在可以将第一个记录发送到驱动程序之前从存储中读取所有 250 条记录(即,客户端需要更多时间才能收到第一条记录)。

  • 在任何记录可供应用程序使用之前,驱动程序必须接收所有 250 条记录。

  • 客户端必须在内存中保存所有 250 条记录。

  • 服务器读取第一条记录并将其发送到驱动程序。

  • 应用程序可以在传输第一条记录后立即处理记录。

  • 剩余记录的等待时间和资源消耗被延迟到应用程序请求更多记录时。

  • 服务器的获取时间可用于客户端处理。

  • 资源消耗受驱动程序的获取大小限制。

渴望加载和延迟加载之间的时间和内存比较
import neo4j
from time import sleep, time
import tracemalloc



URI = "<URI for Neo4j database>"
AUTH = ("<Username>", "<Password>")

# Returns 250 records, each with properties
# - `output` (an expensive computation, to slow down retrieval)
# - `dummyData` (a list of 10000 ints, about 8 KB).
slow_query = '''
UNWIND range(1, 250) AS s
RETURN reduce(s=s, x in range(1,1000000) | s + sin(toFloat(x))+cos(toFloat(x))) AS output,
       range(1, 10000) AS dummyData
'''
# Delay for each processed record
sleep_time = 0.5


def main():
    with neo4j.GraphDatabase.driver(URI, auth=AUTH) as driver:
        driver.verify_connectivity()

        start_time = time()
        log('LAZY LOADING (execute_read)')
        tracemalloc.start()
        lazy_loading(driver)
        log(f'Peak memory usage: {tracemalloc.get_traced_memory()[1]} bytes')
        tracemalloc.stop()
        log('--- %s seconds ---' % (time() - start_time))

        start_time = time()
        log('EAGER LOADING (execute_query)')
        tracemalloc.start()
        eager_loading(driver)
        log(f'Peak memory usage: {tracemalloc.get_traced_memory()[1]} bytes')
        tracemalloc.stop()
        log('--- %s seconds ---' % (time() - start_time))


def lazy_loading(driver):

    def process_records(tx):
        log('Submit query')
        result = tx.run(slow_query)

        for record in result:
            log(f'Processing record {int(record.get("output"))}')
            sleep(sleep_time)  # proxy for some expensive operation

    with driver.session(database='neo4j') as session:
        processed_result = session.execute_read(process_records)


def eager_loading(driver):
    log('Submit query')
    records, _, _ = driver.execute_query(slow_query, database_='neo4j')

    for record in records:
        log(f'Processing record {int(record.get("output"))}')
        sleep(sleep_time)  # proxy for some expensive operation


def log(msg):
    print(f'[{round(time(), 2)}] {msg}')


if __name__ == '__main__':
    main()
输出
[1718014256.98] LAZY LOADING (execute_read)
[1718014256.98] Submit query
[1718014256.21] Processing record 0  (1)
[1718014256.71] Processing record 1
[1718014257.21] Processing record 2
...
[1718014395.42] Processing record 249
[1718014395.92] Peak memory usage: 786254 bytes
[1719984711.39] --- 135.9284942150116 seconds ---

[1718014395.92] EAGER LOADING (execute_query)
[1718014395.92] Submit query
[1718014419.82] Processing record 0  (2)
[1718014420.33] Processing record 1
[1718014420.83] Processing record 2
...
[1718014544.52] Processing record 249
[1718014545.02] Peak memory usage: 89587150 bytes  (3)
[1719984861.09] --- 149.70468592643738 seconds ---  (4)
1 使用延迟加载,第一条记录可以快速获得。
2 使用渴望加载,第一条记录在查询提交约 25 秒后可用(即,服务器检索完所有 250 条记录后)。
3 渴望加载的内存使用量大于延迟加载,因为应用程序将 250 条记录的列表具体化。
4 延迟加载的总运行时间较短,因为当客户端处理记录时,服务器可以获取下一条记录。使用延迟加载,客户端也可以在满足某些条件后停止请求记录(通过在 Result 上调用 .consume()),从而节省时间和资源。

驱动程序的 获取大小 会影响延迟加载的行为。它指示服务器流式传输等于获取大小的记录数量,然后等待客户端赶上,然后再检索和发送更多记录。

获取大小允许限制客户端上的内存消耗。但是,它并不总是限制服务器端的内存消耗:这取决于查询。例如,带有 ORDER BY 的查询需要将整个结果集加载到内存中进行排序,然后才能将记录流式传输到客户端。

获取大小越小,客户端和服务器需要交换的消息就越多。特别是如果服务器的延迟很高,较小的获取大小可能会降低性能。

将读取查询路由到集群读取器

在集群中,**将读取查询路由到 辅助节点**。您可以通过以下方式做到这一点

良好实践

driver.execute_query("MATCH (p:Person) RETURN p", routing_="r")
session.execute_read(lambda tx: tx.run("MATCH (p:Person) RETURN p"))

不良实践

driver.execute_query("MATCH (p:Person) RETURN p")
# defaults to routing = writers
session.execute_write(lambda tx: tx.run("MATCH (p:Person) RETURN p"))
# don't ask to write on a read-only operation

创建索引

**为经常过滤的属性创建索引**。例如,如果您经常按 name 属性查找 Person 节点,则在 Person.name 上创建索引很有用。您可以使用 CREATE INDEX Cypher 子句为节点和关系创建索引。

# Create an index on Person.name
driver.execute_query("CREATE INDEX person_name FOR (n:Person) ON (n.name)")

有关更多信息,请参阅 用于搜索性能的索引

分析查询

分析您的查询 以找到可以提高性能的查询。您可以通过在查询前添加 PROFILE 来分析查询。服务器输出可在 ResultSummary 对象的 profile 属性中获得。

_, summary, _ = driver.execute_query("PROFILE MATCH (p {name: $name}) RETURN p", name="Alice")
print(summary.profile['args']['string-representation'])
"""
Planner COST
Runtime PIPELINED
Runtime version 5.0
Batch size 128

+-----------------+----------------+----------------+------+---------+----------------+------------------------+-----------+---------------------+
| Operator        | Details        | Estimated Rows | Rows | DB Hits | Memory (Bytes) | Page Cache Hits/Misses | Time (ms) | Pipeline            |
+-----------------+----------------+----------------+------+---------+----------------+------------------------+-----------+---------------------+
| +ProduceResults | p              |              1 |    1 |       3 |                |                        |           |                     |
| |               +----------------+----------------+------+---------+----------------+                        |           |                     |
| +Filter         | p.name = $name |              1 |    1 |       4 |                |                        |           |                     |
| |               +----------------+----------------+------+---------+----------------+                        |           |                     |
| +AllNodesScan   | p              |             10 |    4 |       5 |            120 |                 9160/0 |   108.923 | Fused in Pipeline 0 |
+-----------------+----------------+----------------+------+---------+----------------+------------------------+-----------+---------------------+

Total database accesses: 12, total allocated memory: 184
"""

如果某些查询速度非常慢,以至于您甚至无法在合理的时间内运行它们,则可以将 EXPLAIN 而不是 PROFILE 添加到查询前面。这将返回服务器将用于运行查询的计划,但不会执行它。服务器输出可在 ResultSummary 对象的 plan 属性中获得。

_, summary, _ = driver.execute_query("EXPLAIN MATCH (p {name: $name}) RETURN p", name="Alice")
print(summary.plan['args']['string-representation'])
"""
Planner COST
Runtime PIPELINED
Runtime version 5.0
Batch size 128

+-----------------+----------------+----------------+---------------------+
| Operator        | Details        | Estimated Rows | Pipeline            |
+-----------------+----------------+----------------+---------------------+
| +ProduceResults | p              |              1 |                     |
| |               +----------------+----------------+                     |
| +Filter         | p.name = $name |              1 |                     |
| |               +----------------+----------------+                     |
| +AllNodesScan   | p              |             10 | Fused in Pipeline 0 |
+-----------------+----------------+----------------+---------------------+

Total database accesses: ?
"""

指定节点标签

**在所有查询中指定节点标签**。这允许查询规划器更有效地工作,并在可用时利用索引。要了解如何组合标签,请参阅 Cypher → 标签表达式

良好实践

driver.execute_query("MATCH (p:Person|Animal {name: $name}) RETURN p", name="Alice")
with driver.session(database="<DB NAME>") as session:
    session.run("MATCH (p:Person|Animal {name: $name}) RETURN p", name="Alice")

不良实践

driver.execute_query("MATCH (p {name: $name}) RETURN p", name="Alice")
with driver.session(database="<DB NAME>") as session:
    session.run("MATCH (p {name: $name}) RETURN p", name="Alice")

批量数据创建

创建大量记录时使用批处理查询,使用 WITHUNWIND Cypher 子句。

最佳实践

提交一个包含所有值的单个查询
numbers = [{"value": random()} for _ in range(10000)]
driver.execute_query("""
    WITH $numbers AS batch
    UNWIND batch AS node
    MERGE (n:Number)
    SET n.value = node.value
    """, numbers=numbers,
)

不良实践

提交许多单个查询,每个值一个
for _ in range(10000):
    driver.execute_query("MERGE (:Number {value: $value})", value=random())
将大量数据首次导入到新数据库中最有效的方法是使用 neo4j-admin database import 命令。

使用查询参数

始终使用 查询参数,而不是将值硬编码或连接到查询中。除了防止 Cypher 注入外,这还可以更好地利用数据库查询缓存。

良好实践

driver.execute_query("MATCH (p:Person {name: $name}) RETURN p", name="Alice")
with driver.session(database="<DB NAME>") as session:
    session.run("MATCH (p:Person {name: $name}) RETURN p", name="Alice")

不良实践

driver.execute_query("MATCH (p:Person {name: 'Alice'}) RETURN p")
# or
name = "Alice"
driver.execute_query("MATCH (p:Person {name: '" + name + "'}) RETURN p")
with driver.session(database="<DB NAME>") as session:
    session.run("MATCH (p:Person {name: 'Alice'}) RETURN p")
    # or
    name = "Alice"
    session.run("MATCH (p:Person {name: '" + name + "'}) RETURN p")

并发

使用 并发,无论是多线程形式还是使用驱动程序的异步版本。如果您在应用程序中并行化复杂且耗时的查询,这可能会对性能产生更大的影响,但如果您运行许多简单的查询,则影响不大。

仅在需要时使用 MERGE 进行创建

Cypher 子句 MERGE 对于数据创建非常方便,因为它可以避免在存在给定模式的完全克隆时出现重复数据。但是,它需要数据库运行两个查询:它首先需要 MATCH 模式,然后才能 CREATE 它(如果需要)。

如果您已经知道要插入的数据是新的,请避免使用 MERGE 并直接使用 CREATE — 这实际上将数据库查询的数量减少了一半。

过滤通知

术语表

LTS

长期支持版本是保证支持数年。Neo4j 4.4 是 LTS,Neo4j 5 也将具有 LTS 版本。

Aura

Aura 是 Neo4j 的完全托管云服务。它提供免费和付费计划。

Cypher

Cypher 是 Neo4j 的图查询语言,可用于从数据库中检索数据。它类似于 SQL,但适用于图。

APOC

Awesome Procedures On Cypher (APOC) 是一个包含(许多)函数的库,这些函数本身无法用 Cypher 轻松表达。

Bolt

Bolt 是 Neo4j 实例和驱动程序之间交互使用的协议。默认情况下,它侦听端口 7687。

ACID

原子性、一致性、隔离性、持久性 (ACID) 是保证数据库事务可靠处理的属性。符合 ACID 的 DBMS 确保数据库中的数据在出现故障时仍然准确且一致。

最终一致性

如果数据库保证所有集群成员将在某个时间点存储数据的最新版本,则该数据库最终一致。

因果一致性

如果读写查询以相同的顺序由集群中的每个成员看到,则数据库具有因果一致性。这比最终一致性更强。

NULL

空标记不是一种类型,而是值不存在的占位符。有关更多信息,请参阅 Cypher → 使用 null

事务

事务是工作的单元,要么完全提交,要么在失败时回滚。例如银行转账:它涉及多个步骤,但它们必须全部成功或被撤销,以避免从一个帐户中扣除资金但未添加到另一个帐户中。

背压

背压是阻碍数据流动的力量。它确保客户端不会被数据以快于其处理速度的速度淹没。

事务函数

事务函数是由 execute_readexecute_write 调用执行的回调。如果服务器发生故障,驱动程序会自动重新执行回调。

驱动程序

一个 Driver 对象包含建立与 Neo4j 数据库连接所需的详细信息。