进一步的查询机制
隐式(或自动提交)事务
这是运行 Cypher 查询最基本和受限的形式。驱动程序不会自动重试隐式事务,而对于通过 execute_query()
和 托管事务 运行的查询则会重试。仅当其他驱动程序查询接口不适用或用于快速原型设计时,才应使用隐式事务。
您可以使用 Session.run()
方法运行隐式事务。它返回一个需要相应处理的 Result
对象。
with driver.session(database="neo4j") as session:
session.run("CREATE (a:Person {name: $name})", name="Licia")
隐式事务最迟会在会话销毁时或在同一会话中执行另一个事务之前提交。除此之外,无法明确保证隐式事务在会话生命周期内何时准确提交。为确保隐式事务提交,您可以在其结果上调用 .consume()
方法。
由于驱动程序无法判断 session.run()
调用中的查询是需要数据库的读会话还是写会话,因此它默认为写。如果您的隐式事务仅包含读查询,则在创建会话时通过设置关键字参数 default_access_mode=neo4j.READ_ACCESS
可以让驱动程序知晓,从而获得性能提升。
隐式事务是唯一可用于 CALL { … } IN TRANSACTIONS 查询的事务。.
|
导入 CSV 文件
使用 Session.run()
最常见的用例是使用 LOAD CSV
Cypher 子句将大型 CSV 文件导入数据库,并防止因事务大小而导致的超时错误。
with driver.session(database="neo4j") as session:
result = session.run("""
LOAD CSV FROM 'https://data.neo4j.com/bands/artists.csv' AS line
CALL {
WITH line
MERGE (:Artist {name: line[1], age: toInteger(line[2])})
} IN TRANSACTIONS OF 2 ROWS
""")
print(result.consume().counters)
虽然 LOAD CSV 很方便,但将 CSV 文件的解析推迟到您的 Python 应用程序并避免使用 LOAD CSV 并没有什么不妥。实际上,将解析逻辑移至应用程序可以使您对导入过程拥有更多控制权。有关高效批量数据插入,请参阅性能 → 批量数据创建。 |
有关更多信息,请参阅 Cypher → 子句 → 加载 CSV。
事务配置
Query
对象允许指定查询超时时间并将元数据附加到事务。元数据在服务器日志中可见(如 unit_of_work
装饰器所述)。
from neo4j import Query
with driver.session(database="neo4j") as session:
query = Query("CREATE (a:Person {name: $name})",
timeout=1.0,
metadata={"app_name": "people"})
result = session.run(query, name="John")
属性键、关系类型和标签中的动态值
通常,您不应将参数直接连接到查询中,而应使用查询参数。然而,在某些情况下,您的查询结构可能无法在所有部分使用参数。实际上,尽管参数可用于字面量和表达式以及节点和关系 ID,但它们不能用于以下构造:
-
属性键,因此
MATCH (n) WHERE n.$param = 'something'
是无效的; -
关系类型,因此
MATCH (n)-[:$param]→(m)
是无效的; -
标签,因此
MATCH (n:$param)
是无效的。
对于这些查询,您被迫使用字符串拼接。为防止 Cypher 注入,您应将动态值用反引号括起来并自行转义。请注意 Cypher 处理 Unicode,因此也要注意 Unicode 字面量 \u0060
。
label = "Person\\u0060n"
# convert \u0060 to literal backtick and then escape backticks
escaped_label = label.replace("\\u0060", "`").replace("`", "``")
driver.execute_query(
f"MATCH (p:`{escaped_label}` {{name: $name}}) RETURN p.name",
name="Alice",
database_="neo4j"
)
另一种避免字符串拼接的变通方法是使用 APOC 过程,例如 apoc.merge.node
,它支持动态标签和属性键。
apoc.merge.node
创建具有动态标签/属性键的节点。property_key = "name"
label = "Person"
driver.execute_query(
"CALL apoc.merge.node($labels, $properties)",
labels=[label], properties={property_key: "Alice"},
database_="neo4j"
)
如果您在 Docker 中运行 Neo4j,则在启动容器时需要启用 APOC。请参阅 APOC → 安装 → Docker。 |
日志记录
驱动程序通过原生的 logging
库将消息记录到名为 neo4j
的日志记录器。要将日志消息重定向到标准输出,请使用 watch 函数
import sys
from neo4j.debug import watch
watch("neo4j", out=sys.stdout)
[DEBUG ] [Thread 139807941394432] [Task None ] 2023-03-31 09:31:39,616 [#0000] _: <POOL> created, routing address IPv4Address(('localhost', 7687))
[DEBUG ] [Thread 139807941394432] [Task None ] 2023-03-31 09:31:39,616 [#0000] _: <POOL> acquire routing connection, access_mode='WRITE', database='neo4j'
[DEBUG ] [Thread 139807941394432] [Task None ] 2023-03-31 09:31:39,616 [#0000] _: <ROUTING> checking table freshness (readonly=False): table expired=True, has_server_for_mode=False, table routers={IPv4Address(('localhost', 7687))} => False
[DEBUG ] [Thread 139807941394432] [Task None ] 2023-03-31 09:31:39,616 [#0000] _: <POOL> attempting to update routing table from IPv4Address(('localhost', 7687))
[DEBUG ] [Thread 139807941394432] [Task None ] 2023-03-31 09:31:39,616 [#0000] _: <RESOLVE> in: localhost:7687
[DEBUG ] [Thread 139807941394432] [Task None ] 2023-03-31 09:31:39,617 [#0000] _: <RESOLVE> dns resolver out: 127.0.0.1:7687
[DEBUG ] [Thread 139807941394432] [Task None ] 2023-03-31 09:31:39,617 [#0000] _: <POOL> _acquire router connection, database='neo4j', address=ResolvedIPv4Address(('127.0.0.1', 7687))
[DEBUG ] [Thread 139807941394432] [Task None ] 2023-03-31 09:31:39,617 [#0000] _: <POOL> trying to hand out new connection
[DEBUG ] [Thread 139807941394432] [Task None ] 2023-03-31 09:31:39,617 [#0000] C: <OPEN> 127.0.0.1:7687
[DEBUG ] [Thread 139807941394432] [Task None ] 2023-03-31 09:31:39,619 [#AF18] C: <MAGIC> 0x6060B017
[DEBUG ] [Thread 139807941394432] [Task None ] 2023-03-31 09:31:39,619 [#AF18] C: <HANDSHAKE> 0x00000005 0x00020404 0x00000104 0x00000003
[DEBUG ] [Thread 139807941394432] [Task None ] 2023-03-31 09:31:39,620 [#AF18] S: <HANDSHAKE> 0x00000005
[DEBUG ] [Thread 139807941394432] [Task None ] 2023-03-31 09:31:39,620 [#AF18] C: HELLO {'user_agent': 'neo4j-python/5.6.0 Python/3.10.6-final-0 (linux)', 'routing': {'address': 'localhost:7687'}, 'scheme': 'basic', 'principal': 'neo4j', 'credentials': '*******'}
词汇表
- LTS
-
长期支持版本是保证支持数年的版本。Neo4j 4.4 是 LTS 版本,Neo4j 5 也将有一个 LTS 版本。
- Aura
-
Aura 是 Neo4j 的全托管云服务。它提供免费和付费计划。
- Cypher
-
Cypher 是 Neo4j 的图查询语言,可让您从数据库中检索数据。它类似于 SQL,但适用于图。
- APOC
-
Cypher 上的出色过程 (APOC) 是一个包含许多无法在 Cypher 本身中轻松表达的函数的库。
- Bolt
-
Bolt 是 Neo4j 实例与驱动程序之间交互所使用的协议。它默认监听端口 7687。
- ACID
-
原子性 (Atomicity)、一致性 (Consistency)、隔离性 (Isolation)、持久性 (Durability) (ACID) 是保证数据库事务可靠处理的属性。符合 ACID 的 DBMS 确保数据库中的数据即使发生故障也能保持准确和一致。
- 最终一致性
-
如果数据库提供所有集群成员将在某个时间点存储最新版本数据的保证,则该数据库是最终一致的。
- 因果一致性
-
如果集群的每个成员都以相同的顺序看到读写查询,则数据库是因果一致的。这比最终一致性更强。
- NULL
-
null 标记不是一种类型,而是值缺失的占位符。有关更多信息,请参阅 Cypher → 使用
null
。 - 事务
-
事务是一个工作单元,它要么整体提交,要么在失败时回滚。一个例子是银行转账:它涉及多个步骤,但它们必须全部成功或被撤销,以避免资金从一个账户中扣除但未添加到另一个账户中。
- 反压
-
反压是抵抗数据流动的力。它确保客户端不会被超出其处理能力的数据量所淹没。
- 事务函数
-
事务函数是由
execute_read
或execute_write
调用执行的回调。在服务器故障的情况下,驱动程序会自动重新执行该回调。 - 驱动程序
-
一个
Driver
对象包含与 Neo4j 数据库建立连接所需的详细信息。