性能推荐

使用 Rust 扩展

适用于 Python 驱动程序的 Rust 扩展是一个替代的驱动程序包,与常规驱动程序相比,可将速度 提升 3 到 10 倍。您可以使用 pip install neo4j-rust-ext 进行安装,可以与 neo4j 包一起安装,也可以作为其替代。在用法方面,这两个驱动程序是相同的:本指南中的所有内容均适用于这两个包。

始终指定目标数据库

在所有查询中指定目标数据库,可以通过 Driver.execute_query() 中的 database_ 参数,或者通过 创建新会话时的 database 参数来指定。如果未提供数据库,驱动程序必须向服务器发送额外请求以确定默认数据库是什么。单个查询的开销很小,但对于数百个查询而言将变得显著。

最佳实践

driver.execute_query("<QUERY>", database_="{neo4j-database-name}")
driver.session(database="{neo4j-database-name}")

不良实践

driver.execute_query("<QUERY>")
driver.session()

注意事务的成本

通过 .execute_query().execute_read/write() 提交查询时,服务器会自动将它们封装在 事务中。这种行为可确保数据库始终处于一致状态,无论事务执行期间发生什么(断电、软件崩溃等)。

在多个查询周围创建一个安全的执行上下文会产生额外的开销,如果驱动程序只是向服务器发送查询并希望它们能够通过,则不会存在此开销。开销虽小,但随着查询数量的增加,它会累积起来。因此,如果您的用例更看重吞吐量而非数据完整性,您可以通过在单个(自动提交)事务中运行所有查询来进一步提升性能。您可以通过创建会话并使用 session.run() 来运行所需数量的查询来实现此目的。

优先考虑吞吐量而非数据完整性
with driver.session(database="neo4j") as session:
    for i in range(1000):
        session.run("<QUERY>")
优先考虑数据完整性而非吞吐量
for i in range(1000):
    driver.execute_query("<QUERY>", database_="{neo4j-database-name}")
    # or session.execute_read/write() calls

不要一次性获取大量结果集

提交可能产生大量记录的查询时,不要一次性检索所有记录。Neo4j 服务器可以分批检索记录,并在记录可用时将其流式传输给驱动程序。延迟加载结果可以分散网络流量和内存使用(包括客户端和服务器端)。

为方便起见,.execute_query() 总是立即检索所有结果记录(EagerResult 中的 Eager 代表的正是此意)。要延迟加载结果,您必须使用 .execute_read/write()(或手动处理 事务 的其他形式),并且在处理结果时不要Result 对象强制转换为 list;而是对其进行迭代。有关结果处理的更多信息,请参阅 事务 → 处理查询结果

示例 1. 即时加载和延迟加载的比较
即时加载 延迟加载
  • 服务器必须先从存储中读取所有 250 条记录,然后才能将第一条记录发送给驱动程序(即客户端接收第一条记录需要更多时间)。

  • 在任何记录可供应用程序使用之前,驱动程序必须接收所有 250 条记录。

  • 客户端必须在内存中保存所有 250 条记录。

  • 服务器读取第一条记录并将其发送给驱动程序。

  • 应用程序可以在第一条记录传输后立即处理记录。

  • 剩余记录的等待时间和资源消耗推迟到应用程序请求更多记录时进行。

  • 服务器的获取时间可用于客户端处理。

  • 资源消耗受驱动程序获取大小的限制。

即时加载和延迟加载的时间与内存比较
import neo4j
from time import sleep, time
import tracemalloc



URI = "{neo4j-database-uri}"
AUTH = ("{neo4j-username}", "{neo4j-password}")

# Returns 250 records, each with properties
# - `output` (an expensive computation, to slow down retrieval)
# - `dummyData` (a list of 10000 ints, about 8 KB).
slow_query = '''
UNWIND range(1, 250) AS s
RETURN reduce(s=s, x in range(1,1000000) | s + sin(toFloat(x))+cos(toFloat(x))) AS output,
       range(1, 10000) AS dummyData
'''
# Delay for each processed record
sleep_time = 0.5


def main():
    with neo4j.GraphDatabase.driver(URI, auth=AUTH) as driver:
        driver.verify_connectivity()

        start_time = time()
        log('LAZY LOADING (execute_read)')
        tracemalloc.start()
        lazy_loading(driver)
        log(f'Peak memory usage: {tracemalloc.get_traced_memory()[1]} bytes')
        tracemalloc.stop()
        log('--- %s seconds ---' % (time() - start_time))

        start_time = time()
        log('EAGER LOADING (execute_query)')
        tracemalloc.start()
        eager_loading(driver)
        log(f'Peak memory usage: {tracemalloc.get_traced_memory()[1]} bytes')
        tracemalloc.stop()
        log('--- %s seconds ---' % (time() - start_time))


def lazy_loading(driver):

    def process_records(tx):
        log('Submit query')
        result = tx.run(slow_query)

        for record in result:
            log(f'Processing record {int(record.get("output"))}')
            sleep(sleep_time)  # proxy for some expensive operation

    with driver.session(database='neo4j') as session:
        processed_result = session.execute_read(process_records)


def eager_loading(driver):
    log('Submit query')
    records, _, _ = driver.execute_query(slow_query, database_='neo4j')

    for record in records:
        log(f'Processing record {int(record.get("output"))}')
        sleep(sleep_time)  # proxy for some expensive operation


def log(msg):
    print(f'[{round(time(), 2)}] {msg}')


if __name__ == '__main__':
    main()
输出
[1718014256.98] LAZY LOADING (execute_read)
[1718014256.98] Submit query
[1718014256.21] Processing record 0  (1)
[1718014256.71] Processing record 1
[1718014257.21] Processing record 2
...
[1718014395.42] Processing record 249
[1718014395.92] Peak memory usage: 786254 bytes
[1718014395.92] --- 135.9284942150116 seconds ---

[1718014395.92] EAGER LOADING (execute_query)
[1718014395.92] Submit query
[1718014419.82] Processing record 0  (2)
[1718014420.33] Processing record 1
[1718014420.83] Processing record 2
...
[1718014544.52] Processing record 249
[1718014545.02] Peak memory usage: 89587150 bytes  (3)
[1718014545.02] --- 149.70468592643738 seconds ---  (4)
1 使用延迟加载,第一条记录可更快地可用。
2 使用即时加载,一旦结果被消费(即服务器检索到所有 250 条记录后),第一条记录才可用。
3 使用即时加载时内存使用量更大,因为应用程序会实例化一个包含 250 条记录的列表。
4 使用即时加载时,总运行时间更长,因为客户端会等待直到接收到最后一条记录;而使用延迟加载时,客户端可以在服务器获取下一条记录的同时处理记录。使用延迟加载,客户端还可以在满足某些条件后停止请求记录(通过在 Result 对象上调用 .consume())以节省时间和资源。

驱动程序的 获取大小 影响延迟加载的行为。它指示服务器流式传输与获取大小相等的记录量,然后等待客户端追赶上,然后再检索和发送更多记录。

获取大小通常允许限制客户端的内存消耗,特别是对于记录大小变化较小的结果。如果单个记录非常大,驱动程序仍然需要为整个对象分配空间,因此即使获取大小很小,内存使用量也可能变得很大。

另一方面,获取大小并不总是限制服务器端的内存消耗:这取决于查询。例如,带有 ORDER BY 的查询需要在记录流式传输到客户端之前,将整个结果集加载到内存中进行排序。

获取大小越小,客户端和服务器需要交换的消息越多。特别是当服务器延迟很高时,较低的获取大小可能会降低性能。

将读取查询路由到集群读取器

在集群中,将读取查询路由到任何读取器节点。您可以通过以下方式实现:

最佳实践

driver.execute_query("MATCH (p:Person) RETURN p", routing_="r")
session.execute_read(lambda tx: tx.run("MATCH (p:Person) RETURN p"))

不良实践

driver.execute_query("MATCH (p:Person) RETURN p")
# defaults to routing = writers
session.execute_write(lambda tx: tx.run("MATCH (p:Person) RETURN p"))
# don't ask to write on a read-only operation

创建索引

为您经常过滤的属性创建索引。例如,如果您经常通过 name 属性查找 Person 节点,那么在 Person.name 上创建索引会很有益。您可以使用 CREATE INDEX Cypher 子句为节点和关系创建索引。

# Create an index on Person.name
driver.execute_query("CREATE INDEX person_name FOR (n:Person) ON (n.name)")

有关更多信息,请参阅 搜索性能索引

分析查询

分析您的查询以找出可以提高性能的查询。您可以通过在查询前加上 PROFILE 来分析查询。服务器输出可在 ResultSummary 对象的 profile 属性中找到。

_, summary, _ = driver.execute_query("PROFILE MATCH (p {name: $name}) RETURN p", name="Alice")
print(summary.profile['args']['string-representation'])
"""
Planner COST
Runtime PIPELINED
Runtime version 5.0
Batch size 128

+-----------------+----------------+----------------+------+---------+----------------+------------------------+-----------+---------------------+
| Operator        | Details        | Estimated Rows | Rows | DB Hits | Memory (Bytes) | Page Cache Hits/Misses | Time (ms) | Pipeline            |
+-----------------+----------------+----------------+------+---------+----------------+------------------------+-----------+---------------------+
| +ProduceResults | p              |              1 |    1 |       3 |                |                        |           |                     |
| |               +----------------+----------------+------+---------+----------------+                        |           |                     |
| +Filter         | p.name = $name |              1 |    1 |       4 |                |                        |           |                     |
| |               +----------------+----------------+------+---------+----------------+                        |           |                     |
| +AllNodesScan   | p              |             10 |    4 |       5 |            120 |                 9160/0 |   108.923 | Fused in Pipeline 0 |
+-----------------+----------------+----------------+------+---------+----------------+------------------------+-----------+---------------------+

Total database accesses: 12, total allocated memory: 184
"""

如果某些查询速度太慢,甚至无法在合理的时间内运行,您可以在其前面加上 EXPLAIN 而不是 PROFILE。这将返回服务器将用于运行查询的计划,但不会实际执行它。服务器输出可在 ResultSummary 对象的 plan 属性中找到。

_, summary, _ = driver.execute_query("EXPLAIN MATCH (p {name: $name}) RETURN p", name="Alice")
print(summary.plan['args']['string-representation'])
"""
Planner COST
Runtime PIPELINED
Runtime version 5.0
Batch size 128

+-----------------+----------------+----------------+---------------------+
| Operator        | Details        | Estimated Rows | Pipeline            |
+-----------------+----------------+----------------+---------------------+
| +ProduceResults | p              |              1 |                     |
| |               +----------------+----------------+                     |
| +Filter         | p.name = $name |              1 |                     |
| |               +----------------+----------------+                     |
| +AllNodesScan   | p              |             10 | Fused in Pipeline 0 |
+-----------------+----------------+----------------+---------------------+

Total database accesses: ?
"""

指定节点标签

在所有查询中指定节点标签。这使得查询规划器能够更高效地工作,并利用可用的索引。要了解如何组合标签,请参阅 Cypher → 标签表达式

最佳实践

driver.execute_query("MATCH (p:Person|Animal {name: $name}) RETURN p", name="Alice")
with driver.session(database="{neo4j-database-name}") as session:
    session.run("MATCH (p:Person|Animal {name: $name}) RETURN p", name="Alice")

不良实践

driver.execute_query("MATCH (p {name: $name}) RETURN p", name="Alice")
with driver.session(database="{neo4j-database-name}") as session:
    session.run("MATCH (p {name: $name}) RETURN p", name="Alice")

批量数据创建

使用 WITHUNWIND Cypher 子句在创建大量记录时进行批量查询

最佳实践

提交包含所有值的单个查询
numbers = [{"value": random()} for _ in range(10000)]
driver.execute_query("""
    UNWIND $numbers AS node
    MERGE (:Number {value: node.value})
    """, numbers=numbers,
)

不良实践

提交大量单个查询,每个值一个
for _ in range(10000):
    driver.execute_query("MERGE (:Number {value: $value})", value=random())
将大量数据首次导入到新数据库的最有效方法是使用 neo4j-admin database import 命令。

使用查询参数

始终使用 查询参数,而不是将值硬编码或拼接进查询中。除了防止 Cypher 注入外,这还能更好地利用数据库查询缓存。

最佳实践

driver.execute_query("MATCH (p:Person {name: $name}) RETURN p", name="Alice")
with driver.session(database="{neo4j-database-name}") as session:
    session.run("MATCH (p:Person {name: $name}) RETURN p", name="Alice")

不良实践

driver.execute_query("MATCH (p:Person {name: 'Alice'}) RETURN p")
# or
name = "Alice"
driver.execute_query("MATCH (p:Person {name: '" + name + "'}) RETURN p")
with driver.session(database="{neo4j-database-name}") as session:
    session.run("MATCH (p:Person {name: 'Alice'}) RETURN p")
    # or
    name = "Alice"
    session.run("MATCH (p:Person {name: '" + name + "'}) RETURN p")

并发

使用 并发,无论是多线程形式还是驱动程序的异步版本。如果您在应用程序中并行化复杂且耗时的查询,这将对性能产生更大的影响,但如果运行许多简单的查询则影响不大。

仅在需要时使用 MERGE 进行创建

Cypher 子句 MERGE 方便数据创建,因为它允许在给定模式的精确克隆存在时避免重复数据。但是,它要求数据库运行两个查询:它首先需要 MATCH 模式,然后才能 CREATE 它(如果需要)。

如果您已经知道要插入的数据是新的,请避免使用 MERGE,而是直接使用 CREATE — 这实际上将数据库查询的数量减半。

筛选通知

术语表

LTS

长期支持(Long Term Support)版本是保证支持多年的版本。Neo4j 4.4 是 LTS 版本,Neo4j 5 也将有 LTS 版本。

Aura

Aura 是 Neo4j 的全托管云服务。它提供免费和付费计划。

Cypher

Cypher 是 Neo4j 的图查询语言,可让您从数据库中检索数据。它类似于 SQL,但适用于图。

APOC

Awesome Procedures On Cypher (APOC) 是一个包含(许多)无法轻松在 Cypher 中表达的函数的库。

Bolt

Bolt 是 Neo4j 实例和驱动程序之间交互所使用的协议。它默认监听端口 7687。

ACID

原子性(Atomicity)、一致性(Consistency)、隔离性(Isolation)、持久性(Durability)(ACID)是保证数据库事务可靠处理的属性。符合 ACID 的 DBMS 确保数据库中的数据在发生故障时仍能保持准确和一致。

最终一致性

如果数据库保证所有集群成员将在某个时间点存储最新版本的数据,则该数据库是最终一致的。

因果一致性

如果集群的每个成员都以相同的顺序看到读写查询,则数据库是因果一致的。这比最终一致性更强。

NULL

空标记不是一种类型,而是表示值缺失的占位符。有关更多信息,请参阅 Cypher → 使用 null

事务

事务是工作的一个单元,它要么整体提交,要么在失败时回滚。一个例子是银行转账:它涉及多个步骤,但它们必须成功或被撤销,以避免资金从一个账户中扣除而未添加到另一个账户中。

背压

背压是阻碍数据流动的力。它确保客户端不会因数据传输速度过快而超负荷。

事务函数

事务函数是由 execute_readexecute_write 调用执行的回调。在服务器故障的情况下,驱动程序会自动重新执行该回调。

驱动程序

一个 Driver 对象包含了与 Neo4j 数据库建立连接所需的详细信息。

© . All rights reserved.