运行您自己的事务
当 使用 execute_query()
查询数据库 时,驱动程序会自动创建一个 _事务_。事务是工作的单位,它要么 _完全提交_,要么 _在失败时回滚_。您可以在单个查询中包含多个 Cypher 语句,例如,当按顺序使用 MATCH
和 CREATE
来 更新数据库 时,但您不能有多个查询,并且不能在它们之间交织一些客户端逻辑。
对于这些更高级的用例,驱动程序提供函数来完全控制事务生命周期。这些被称为 _托管事务_,您可以将它们视为一种展开 execute_query()
流程的方式,并能够在更多地方指定其所需行为。
创建一个会话
在运行事务之前,您需要获得一个 _会话_。会话充当驱动程序和服务器之间具体的查询通道,并确保 因果一致性得到执行。
会话使用方法 Driver.session()
创建,关键字参数 database
允许指定 目标数据库。有关其他参数,请参见 会话配置。
with driver.session(database="neo4j") as session:
...
会话创建是一个轻量级操作,因此可以在没有重大成本的情况下创建和销毁会话。完成使用会话后,请始终 关闭会话。
**会话 _不是_ 线程安全的**:您可以在线程之间共享主 Driver
对象,但确保每个线程都创建自己的会话。
运行托管事务
事务可以包含任意数量的查询。由于 Neo4j 符合 ACID 标准,**事务内的查询将要么整体执行,要么根本不执行**:您无法获得事务的一部分成功而另一部分失败。使用事务将一起工作的相关查询组合在一起,以实现单个 _逻辑_ 数据库操作。
托管事务使用方法 Session.execute_read()
和 Session.execute_write()
创建,具体取决于您是要从数据库中检索数据还是更改数据。这两种方法都接受一个 _事务函数_ 回调,该回调负责实际执行查询和处理结果。
Al
开头的人。def match_person_nodes(tx, name_filter): (3)
result = tx.run(""" (4)
MATCH (p:Person) WHERE p.name STARTS WITH $filter
RETURN p.name AS name ORDER BY name
""", filter=name_filter)
return list(result) # return a list of Record objects (5)
with driver.session(database="neo4j") as session: (1)
people = session.execute_read( (2)
match_person_nodes,
"Al",
)
for person in people:
print(person.data()) # obtain dict representation
1 | 创建一个会话。单个会话可以是多个查询的容器。除非使用 with 结构创建,否则请记住在完成时关闭它。 |
2 | .execute_read() (或 .execute_write() )方法是事务的入口点。它接受一个 _事务函数_ 的回调,以及任意数量的位置参数和关键字参数,这些参数会传递给事务函数。 |
3 | 事务函数回调负责运行查询。 |
4 | 使用方法 Transaction.run() 运行查询。运行的每个查询都返回一个 Result 对象。 |
5 | 处理结果,使用 Result 上的任何方法。 |
**不要将参数直接硬编码或连接到查询中**。出于性能和安全原因,请改用 查询参数。
**事务函数不应直接返回 Result
对象**。相反,请始终 处理结果;至少,将其转换为列表。在事务函数中,return
语句会导致事务提交,而如果引发异常,则事务会自动回滚。
.execute_read() 和 .execute_write() 方法已取代 .read_transaction() 和 .write_transaction() ,后者在 5.x 版本中已弃用,将在 6.0 版本中删除。 |
from neo4j import GraphDatabase
URI = "<URI for Neo4j database>"
AUTH = ("<Username>", "<Password>")
employee_threshold=10
def main():
with GraphDatabase.driver(URI, auth=AUTH) as driver:
with driver.session(database="neo4j") as session:
for i in range(100):
name = f"Thor{i}"
org_id = session.execute_write(employ_person_tx, name)
print(f"User {name} added to organization {org_id}")
def employ_person_tx(tx, name):
# Create new Person node with given name, if not exists already
result = tx.run("""
MERGE (p:Person {name: $name})
RETURN p.name AS name
""", name=name
)
# Obtain most recent organization ID and the number of people linked to it
result = tx.run("""
MATCH (o:Organization)
RETURN o.id AS id, COUNT{(p:Person)-[r:WORKS_FOR]->(o)} AS employees_n
ORDER BY o.created_date DESC
LIMIT 1
""")
org = result.single()
if org is not None and org["employees_n"] == 0:
raise Exception("Most recent organization is empty.")
# Transaction will roll back -> not even Person is created!
# If org does not have too many employees, add this Person to that
if org is not None and org.get("employees_n") < employee_threshold:
result = tx.run("""
MATCH (o:Organization {id: $org_id})
MATCH (p:Person {name: $name})
MERGE (p)-[r:WORKS_FOR]->(o)
RETURN $org_id AS id
""", org_id=org["id"], name=name
)
# Otherwise, create a new Organization and link Person to it
else:
result = tx.run("""
MATCH (p:Person {name: $name})
CREATE (o:Organization {id: randomuuid(), created_date: datetime()})
MERGE (p)-[r:WORKS_FOR]->(o)
RETURN o.id AS id
""", name=name
)
# Return the Organization ID to which the new Person ends up in
return result.single()["id"]
if __name__ == "__main__":
main()
如果事务因驱动程序认为是瞬态的原因而失败,它会自动重试运行事务函数(以指数级增加的延迟)。因此,**事务函数必须是 _幂等的_**(即,它们在运行多次时应产生相同的效果),因为您事先不知道它们将执行多少次。在实践中,这意味着您不应编辑或依赖全局变量,例如。请注意,虽然事务函数可能会执行多次,但它内部的查询始终只会执行一次。
会话可以链接多个事务,但**在任何给定时间内,会话中只能有一个事务处于活动状态**。若要维护多个并发事务,请使用多个并发会话。
事务函数配置
装饰器 unit_of_work()
允许对事务函数进行进一步控制。它允许指定
-
事务超时(以秒为单位)。运行时间更长的事务将被服务器终止。默认值在服务器端设置。最小值为 1 毫秒(
0.001
)。 -
一个附加到事务的元数据字典。这些元数据记录在服务器
query.log
中,并在SHOW TRANSACTIONS
Cypher 命令的输出中可见。使用此选项标记事务。
from neo4j import unit_of_work
@unit_of_work(timeout=5, metadata={"app_name": "people_tracker"})
def count_people(tx):
result = tx.run("MATCH (a:Person) RETURN count(a) AS people")
record = result.single()
return record["people"]
with driver.session(database="neo4j") as session:
people_n = session.execute_read(count_people)
运行显式事务
您可以通过使用方法 Session.begin_transaction()
手动开始事务来获得对事务的完全控制。然后,您可以在显式事务中使用方法 Transaction.run()
运行查询。
with driver.session(database="neo4j") as session:
with session.begin_transaction() as tx:
# use tx.run() to run queries and tx.commit() when done
tx.run("<QUERY 1>")
tx.run("<QUERY 2>")
tx.commit()
显式事务可以使用 Transaction.commit()
提交,也可以使用 Transaction.rollback()
回滚。如果未采取任何显式操作,驱动程序将在其生命周期结束时自动回滚事务。
显式事务最适用于需要将 Cypher 执行分布到同一事务的多个函数中的应用程序,或者适用于需要在单个事务中运行多个查询但不需要托管事务提供的自动重试的应用程序。
import neo4j
URI = "<URI for Neo4j database>"
AUTH = ("<Username>", "<Password>")
def main():
with neo4j.GraphDatabase.driver(URI, auth=AUTH) as driver:
customer_id = create_customer(driver)
other_bank_id = 42
transfer_to_other_bank(driver, customer_id, other_bank_id, 999)
def create_customer(driver):
result, _, _ = driver.execute_query("""
MERGE (c:Customer {id: rand()})
RETURN c.id AS id
""", database_ = "neo4j")
return result[0]["id"]
def transfer_to_other_bank(driver, customer_id, other_bank_id, amount):
with driver.session(database="neo4j") as session:
with session.begin_transaction() as tx:
if not customer_balance_check(tx, customer_id, amount):
# give up
return
other_bank_transfer_api(customer_id, other_bank_id, amount)
# Now the money has been transferred => can't rollback anymore
# (cannot rollback external services interactions)
try:
decrease_customer_balance(tx, customer_id, amount)
tx.commit()
except Exception as e:
request_inspection(customer_id, other_bank_id, amount, e)
raise # roll back
def customer_balance_check(tx, customer_id, amount):
query = ("""
MATCH (c:Customer {id: $id})
RETURN c.balance >= $amount AS sufficient
""")
result = tx.run(query, id=customer_id, amount=amount)
record = result.single(strict=True)
return record["sufficient"]
def other_bank_transfer_api(customer_id, other_bank_id, amount):
# make some API call to other bank
pass
def decrease_customer_balance(tx, customer_id, amount):
query = ("""
MATCH (c:Customer {id: $id})
SET c.balance = c.balance - $amount
""")
result = tx.run(query, id=customer_id, amount=amount)
result.consume()
def request_inspection(customer_id, other_bank_id, amount, e):
# manual cleanup required; log this or similar
print("WARNING: transaction rolled back due to exception:", repr(e))
print("customer_id:", customer_id, "other_bank_id:", other_bank_id,
"amount:", amount)
if __name__ == "__main__":
main()
处理查询结果
驱动程序对查询的输出是一个 Result
对象,它将 Cypher 结果封装在一个丰富的
-
服务器不会立即或完全地获取和返回结果记录。相反,结果会以延迟流的形式返回。具体来说,当驱动程序从服务器接收到一些记录时,这些记录最初会缓冲在一个后台队列中。记录会保留在缓冲区中,直到被应用程序消费,此时它们会被从缓冲区中删除。当没有更多记录可用时,结果就耗尽了。
-
结果充当游标。这意味着无法从流中检索之前的记录,除非您将其保存在辅助数据结构中。
下面的动画展示了单个查询的路径:它展示了驱动程序如何处理结果记录以及应用程序如何处理结果。
处理结果的最简单方法是将其转换为列表,这将生成一个包含 Record
对象的列表。否则,Result
对象将实现一些用于处理记录的方法。最常用的方法列在下面。
名称 | 描述 |
---|---|
|
将结果的剩余部分作为列表返回。如果指定了 |
|
从结果中返回最多 |
|
返回下一个且唯一的剩余记录,或 如果有多于(或少于)一条记录可用,则
|
|
从结果中返回下一条记录,但不消费它。这会将记录保留在缓冲区中以供进一步处理。 |
|
返回原始结果的类似 JSON 的转储。仅用于调试/原型设计目的。 |
|
返回查询的 结果摘要。它会耗尽结果,因此应仅在数据处理完成后调用。 |
|
将结果转换为图形对象集合。请参阅 转换为图形。 |
|
将结果转换为 Pandas 数据框。请参阅 转换为 Pandas 数据框。 |
有关Result
方法的完整列表,请参阅 API 文档 — 结果。
会话配置
数据库选择
您应该始终使用database
参数显式指定数据库,即使在单数据库实例上也是如此。这使驱动程序能够更有效地工作,因为它可以节省与服务器进行网络往返以解析主数据库的时间。如果未提供数据库,则使用 Neo4j 实例设置中设置的 默认数据库。
with driver.session(
database="neo4j"
) as session:
...
通过配置方法指定数据库优先于 USE Cypher 子句。如果服务器在集群上运行,使用USE 的查询需要启用服务器端路由。查询执行时间也可能更长,因为它们可能无法在第一次尝试时到达正确的集群成员,并且需要路由到包含请求数据库的成员。 |
请求路由
在集群环境中,所有会话都以写入模式打开,将其路由到领导者。您可以通过将default_access_mode
参数显式设置为neo4j.READ_ACCESS
或neo4j.WRITE_ACCESS
来更改此设置。请注意,.execute_read()
和.execute_write()
会自动覆盖会话的默认访问模式。
import neo4j
with driver.session(
database="neo4j",
default_access_mode=neo4j.READ_ACCESS
) as session:
...
尽管在读取模式下执行写入查询可能会导致运行时错误,但您不应该依赖此功能来进行访问控制。 两种模式之间的区别在于,读取事务会路由到集群的任何节点,而写入事务会路由到领导者。换句话说,不能保证在读取模式下提交的写入查询会被拒绝。 类似的说明适用于 |
以不同的用户身份运行查询(模拟)
您可以使用参数impersonated_user
在不同用户的安全上下文中执行查询,指定要模拟的用户名称。为此,创建Driver
对象的用户需要具有 相应的权限。模拟用户比创建新的Driver
对象更便宜。
with driver.session(
database="neo4j",
impersonated_user="somebody_else"
) as session:
...
模拟用户时,查询将在模拟用户的完整安全上下文中运行,而不是在经过身份验证的用户(即主数据库、权限等)的上下文中运行。
关闭会话
每个连接池都有有限数量的会话,因此,如果您打开会话而不关闭它们,应用程序可能会耗尽会话。因此,建议使用with
语句创建会话,该语句会在应用程序完成使用会话后自动关闭它们。关闭会话后,它将返回到连接池中以供稍后重用。
如果您不使用with
,请记住在完成使用会话后调用.close()
方法。
session = driver.session(database="neo4j")
# session usage
session.close()
术语表
- LTS
-
长期支持版本是保证支持若干年的版本。Neo4j 4.4 是 LTS 版本,Neo4j 5 也将具有 LTS 版本。
- Aura
-
Aura 是 Neo4j 的完全托管云服务。它提供免费和付费计划。
- Cypher
-
Cypher 是 Neo4j 的图形查询语言,它允许您从数据库中检索数据。它类似于 SQL,但适用于图形。
- APOC
-
Cypher 上的强大程序 (APOC) 是一个包含(许多)函数的库,这些函数无法在 Cypher 本身中轻松表达。
- Bolt
-
Bolt 是用于 Neo4j 实例和驱动程序之间交互的协议。默认情况下,它侦听端口 7687。
- ACID
-
原子性、一致性、隔离性、持久性 (ACID) 是保证数据库事务可靠处理的属性。符合 ACID 的 DBMS 可确保数据库中的数据即使在发生故障时也能保持准确性和一致性。
- 最终一致性
-
如果数据库提供保证,即所有集群成员将在某个时间点存储数据的最新版本,则该数据库是最终一致的。
- 因果一致性
-
如果数据库保证读取和写入查询以相同的顺序由集群中的每个成员看到,则该数据库是因果一致的。这比最终一致性更强。
- NULL
-
空标记不是类型,而是值缺失的占位符。有关更多信息,请参阅 Cypher → 使用
null
。 - 事务
-
事务是工作单元,可以完全提交,也可以在失败时回滚。一个示例是银行转账:它涉及多个步骤,但它们必须全部成功或被撤销,以避免从一个帐户中减去资金,而没有将其添加到另一个帐户中。
- 背压
-
背压是与数据流相反的力。它确保客户端不会因超出其处理能力的数据速度而不堪重负。
- 事务函数
-
事务函数是由
execute_read
或execute_write
调用执行的回调。如果发生服务器故障,驱动程序会自动重新执行回调。 - 驱动程序
-
Driver
对象包含建立与 Neo4j 数据库连接所需的详细信息。