性能推荐
使用 Rust 扩展
适用于 Python 驱动程序的 Rust 扩展是一个替代的驱动程序包,与常规驱动程序相比,可将速度 提升 3 到 10 倍。您可以使用 pip install neo4j-rust-ext
进行安装,可以与 neo4j
包一起安装,也可以作为其替代。在用法方面,这两个驱动程序是相同的:本指南中的所有内容均适用于这两个包。
始终指定目标数据库
在所有查询中指定目标数据库,可以通过 Driver.execute_query()
中的 database_
参数,或者通过 创建新会话时的 database
参数来指定。如果未提供数据库,驱动程序必须向服务器发送额外请求以确定默认数据库是什么。单个查询的开销很小,但对于数百个查询而言将变得显著。
最佳实践
driver.execute_query("<QUERY>", database_="{neo4j-database-name}")
driver.session(database="{neo4j-database-name}")
不良实践
driver.execute_query("<QUERY>")
driver.session()
注意事务的成本
通过 .execute_query()
或 .execute_read/write()
提交查询时,服务器会自动将它们封装在 事务中。这种行为可确保数据库始终处于一致状态,无论事务执行期间发生什么(断电、软件崩溃等)。
在多个查询周围创建一个安全的执行上下文会产生额外的开销,如果驱动程序只是向服务器发送查询并希望它们能够通过,则不会存在此开销。开销虽小,但随着查询数量的增加,它会累积起来。因此,如果您的用例更看重吞吐量而非数据完整性,您可以通过在单个(自动提交)事务中运行所有查询来进一步提升性能。您可以通过创建会话并使用 session.run()
来运行所需数量的查询来实现此目的。
with driver.session(database="neo4j") as session:
for i in range(1000):
session.run("<QUERY>")
for i in range(1000):
driver.execute_query("<QUERY>", database_="{neo4j-database-name}")
# or session.execute_read/write() calls
不要一次性获取大量结果集
提交可能产生大量记录的查询时,不要一次性检索所有记录。Neo4j 服务器可以分批检索记录,并在记录可用时将其流式传输给驱动程序。延迟加载结果可以分散网络流量和内存使用(包括客户端和服务器端)。
为方便起见,.execute_query()
总是立即检索所有结果记录(EagerResult
中的 Eager
代表的正是此意)。要延迟加载结果,您必须使用 .execute_read/write()
(或手动处理 事务 的其他形式),并且在处理结果时不要将 Result
对象强制转换为 list
;而是对其进行迭代。有关结果处理的更多信息,请参阅 事务 → 处理查询结果。
即时加载 | 延迟加载 |
---|---|
|
|
import neo4j
from time import sleep, time
import tracemalloc
URI = "{neo4j-database-uri}"
AUTH = ("{neo4j-username}", "{neo4j-password}")
# Returns 250 records, each with properties
# - `output` (an expensive computation, to slow down retrieval)
# - `dummyData` (a list of 10000 ints, about 8 KB).
slow_query = '''
UNWIND range(1, 250) AS s
RETURN reduce(s=s, x in range(1,1000000) | s + sin(toFloat(x))+cos(toFloat(x))) AS output,
range(1, 10000) AS dummyData
'''
# Delay for each processed record
sleep_time = 0.5
def main():
with neo4j.GraphDatabase.driver(URI, auth=AUTH) as driver:
driver.verify_connectivity()
start_time = time()
log('LAZY LOADING (execute_read)')
tracemalloc.start()
lazy_loading(driver)
log(f'Peak memory usage: {tracemalloc.get_traced_memory()[1]} bytes')
tracemalloc.stop()
log('--- %s seconds ---' % (time() - start_time))
start_time = time()
log('EAGER LOADING (execute_query)')
tracemalloc.start()
eager_loading(driver)
log(f'Peak memory usage: {tracemalloc.get_traced_memory()[1]} bytes')
tracemalloc.stop()
log('--- %s seconds ---' % (time() - start_time))
def lazy_loading(driver):
def process_records(tx):
log('Submit query')
result = tx.run(slow_query)
for record in result:
log(f'Processing record {int(record.get("output"))}')
sleep(sleep_time) # proxy for some expensive operation
with driver.session(database='neo4j') as session:
processed_result = session.execute_read(process_records)
def eager_loading(driver):
log('Submit query')
records, _, _ = driver.execute_query(slow_query, database_='neo4j')
for record in records:
log(f'Processing record {int(record.get("output"))}')
sleep(sleep_time) # proxy for some expensive operation
def log(msg):
print(f'[{round(time(), 2)}] {msg}')
if __name__ == '__main__':
main()
[1718014256.98] LAZY LOADING (execute_read)
[1718014256.98] Submit query
[1718014256.21] Processing record 0 (1)
[1718014256.71] Processing record 1
[1718014257.21] Processing record 2
...
[1718014395.42] Processing record 249
[1718014395.92] Peak memory usage: 786254 bytes
[1718014395.92] --- 135.9284942150116 seconds ---
[1718014395.92] EAGER LOADING (execute_query)
[1718014395.92] Submit query
[1718014419.82] Processing record 0 (2)
[1718014420.33] Processing record 1
[1718014420.83] Processing record 2
...
[1718014544.52] Processing record 249
[1718014545.02] Peak memory usage: 89587150 bytes (3)
[1718014545.02] --- 149.70468592643738 seconds --- (4)
1 | 使用延迟加载,第一条记录可更快地可用。 |
2 | 使用即时加载,一旦结果被消费(即服务器检索到所有 250 条记录后),第一条记录才可用。 |
3 | 使用即时加载时内存使用量更大,因为应用程序会实例化一个包含 250 条记录的列表。 |
4 | 使用即时加载时,总运行时间更长,因为客户端会等待直到接收到最后一条记录;而使用延迟加载时,客户端可以在服务器获取下一条记录的同时处理记录。使用延迟加载,客户端还可以在满足某些条件后停止请求记录(通过在 Result 对象上调用 .consume() )以节省时间和资源。 |
将读取查询路由到集群读取器
在集群中,将读取查询路由到任何读取器节点。您可以通过以下方式实现:
-
在
Driver.execute_query()
调用中指定routing_="r"
-
使用
Session.execute_read()
而非Session.execute_write()
(对于托管事务) -
创建新会话时设置
default_access_mode=neo4j.READ_ACCESS
(对于显式事务)。
最佳实践
driver.execute_query("MATCH (p:Person) RETURN p", routing_="r")
session.execute_read(lambda tx: tx.run("MATCH (p:Person) RETURN p"))
不良实践
driver.execute_query("MATCH (p:Person) RETURN p")
# defaults to routing = writers
session.execute_write(lambda tx: tx.run("MATCH (p:Person) RETURN p"))
# don't ask to write on a read-only operation
创建索引
为您经常过滤的属性创建索引。例如,如果您经常通过 name
属性查找 Person
节点,那么在 Person.name
上创建索引会很有益。您可以使用 CREATE INDEX
Cypher 子句为节点和关系创建索引。
# Create an index on Person.name
driver.execute_query("CREATE INDEX person_name FOR (n:Person) ON (n.name)")
有关更多信息,请参阅 搜索性能索引。
分析查询
分析您的查询以找出可以提高性能的查询。您可以通过在查询前加上 PROFILE
来分析查询。服务器输出可在 ResultSummary
对象的 profile
属性中找到。
_, summary, _ = driver.execute_query("PROFILE MATCH (p {name: $name}) RETURN p", name="Alice")
print(summary.profile['args']['string-representation'])
"""
Planner COST
Runtime PIPELINED
Runtime version 5.0
Batch size 128
+-----------------+----------------+----------------+------+---------+----------------+------------------------+-----------+---------------------+
| Operator | Details | Estimated Rows | Rows | DB Hits | Memory (Bytes) | Page Cache Hits/Misses | Time (ms) | Pipeline |
+-----------------+----------------+----------------+------+---------+----------------+------------------------+-----------+---------------------+
| +ProduceResults | p | 1 | 1 | 3 | | | | |
| | +----------------+----------------+------+---------+----------------+ | | |
| +Filter | p.name = $name | 1 | 1 | 4 | | | | |
| | +----------------+----------------+------+---------+----------------+ | | |
| +AllNodesScan | p | 10 | 4 | 5 | 120 | 9160/0 | 108.923 | Fused in Pipeline 0 |
+-----------------+----------------+----------------+------+---------+----------------+------------------------+-----------+---------------------+
Total database accesses: 12, total allocated memory: 184
"""
如果某些查询速度太慢,甚至无法在合理的时间内运行,您可以在其前面加上 EXPLAIN
而不是 PROFILE
。这将返回服务器将用于运行查询的计划,但不会实际执行它。服务器输出可在 ResultSummary
对象的 plan
属性中找到。
_, summary, _ = driver.execute_query("EXPLAIN MATCH (p {name: $name}) RETURN p", name="Alice")
print(summary.plan['args']['string-representation'])
"""
Planner COST
Runtime PIPELINED
Runtime version 5.0
Batch size 128
+-----------------+----------------+----------------+---------------------+
| Operator | Details | Estimated Rows | Pipeline |
+-----------------+----------------+----------------+---------------------+
| +ProduceResults | p | 1 | |
| | +----------------+----------------+ |
| +Filter | p.name = $name | 1 | |
| | +----------------+----------------+ |
| +AllNodesScan | p | 10 | Fused in Pipeline 0 |
+-----------------+----------------+----------------+---------------------+
Total database accesses: ?
"""
指定节点标签
在所有查询中指定节点标签。这使得查询规划器能够更高效地工作,并利用可用的索引。要了解如何组合标签,请参阅 Cypher → 标签表达式。
最佳实践
driver.execute_query("MATCH (p:Person|Animal {name: $name}) RETURN p", name="Alice")
with driver.session(database="{neo4j-database-name}") as session:
session.run("MATCH (p:Person|Animal {name: $name}) RETURN p", name="Alice")
不良实践
driver.execute_query("MATCH (p {name: $name}) RETURN p", name="Alice")
with driver.session(database="{neo4j-database-name}") as session:
session.run("MATCH (p {name: $name}) RETURN p", name="Alice")
批量数据创建
最佳实践
numbers = [{"value": random()} for _ in range(10000)]
driver.execute_query("""
UNWIND $numbers AS node
MERGE (:Number {value: node.value})
""", numbers=numbers,
)
不良实践
for _ in range(10000):
driver.execute_query("MERGE (:Number {value: $value})", value=random())
将大量数据首次导入到新数据库的最有效方法是使用 neo4j-admin database import 命令。 |
使用查询参数
始终使用 查询参数,而不是将值硬编码或拼接进查询中。除了防止 Cypher 注入外,这还能更好地利用数据库查询缓存。
最佳实践
driver.execute_query("MATCH (p:Person {name: $name}) RETURN p", name="Alice")
with driver.session(database="{neo4j-database-name}") as session:
session.run("MATCH (p:Person {name: $name}) RETURN p", name="Alice")
不良实践
driver.execute_query("MATCH (p:Person {name: 'Alice'}) RETURN p")
# or
name = "Alice"
driver.execute_query("MATCH (p:Person {name: '" + name + "'}) RETURN p")
with driver.session(database="{neo4j-database-name}") as session:
session.run("MATCH (p:Person {name: 'Alice'}) RETURN p")
# or
name = "Alice"
session.run("MATCH (p:Person {name: '" + name + "'}) RETURN p")
并发
使用 并发,无论是多线程形式还是驱动程序的异步版本。如果您在应用程序中并行化复杂且耗时的查询,这将对性能产生更大的影响,但如果运行许多简单的查询则影响不大。
筛选通知
术语表
- LTS
-
长期支持(Long Term Support)版本是保证支持多年的版本。Neo4j 4.4 是 LTS 版本,Neo4j 5 也将有 LTS 版本。
- Aura
-
Aura 是 Neo4j 的全托管云服务。它提供免费和付费计划。
- Cypher
-
Cypher 是 Neo4j 的图查询语言,可让您从数据库中检索数据。它类似于 SQL,但适用于图。
- APOC
-
Awesome Procedures On Cypher (APOC) 是一个包含(许多)无法轻松在 Cypher 中表达的函数的库。
- Bolt
-
Bolt 是 Neo4j 实例和驱动程序之间交互所使用的协议。它默认监听端口 7687。
- ACID
-
原子性(Atomicity)、一致性(Consistency)、隔离性(Isolation)、持久性(Durability)(ACID)是保证数据库事务可靠处理的属性。符合 ACID 的 DBMS 确保数据库中的数据在发生故障时仍能保持准确和一致。
- 最终一致性
-
如果数据库保证所有集群成员将在某个时间点存储最新版本的数据,则该数据库是最终一致的。
- 因果一致性
-
如果集群的每个成员都以相同的顺序看到读写查询,则数据库是因果一致的。这比最终一致性更强。
- NULL
-
空标记不是一种类型,而是表示值缺失的占位符。有关更多信息,请参阅 Cypher → 使用
null
。 - 事务
-
事务是工作的一个单元,它要么整体提交,要么在失败时回滚。一个例子是银行转账:它涉及多个步骤,但它们都必须成功或被撤销,以避免资金从一个账户中扣除而未添加到另一个账户中。
- 背压
-
背压是阻碍数据流动的力。它确保客户端不会因数据传输速度过快而超负荷。
- 事务函数
-
事务函数是由
execute_read
或execute_write
调用执行的回调。在服务器故障的情况下,驱动程序会自动重新执行该回调。 - 驱动程序
-
一个
Driver
对象包含了与 Neo4j 数据库建立连接所需的详细信息。