运行您自己的事务

使用 execute_query() 查询数据库时,驱动程序会自动创建一个事务。事务是一个工作单元,要么整体提交,要么在失败时回滚。您可以在单个查询中包含多个 Cypher 语句,例如在使用 MATCHCREATE 序列更新数据库时,但您不能有多个查询并在它们之间交错一些客户端逻辑。

对于这些更高级的用例,驱动程序提供了函数来完全控制事务生命周期。这些被称为托管事务,您可以将其视为解开 execute_query() 流程并能够在更多地方指定其所需行为的一种方式。

创建会话

在运行事务之前,您需要获取一个会话。会话充当驱动程序和服务器之间的具体查询通道,并确保强制执行因果一致性

会话通过方法Driver.session()创建,其中关键字参数 database 允许指定目标数据库。有关更多参数,请参阅会话配置

with driver.session(database="neo4j") as session:
    ...

会话创建是一个轻量级操作,因此创建和销毁会话无需付出显著成本。使用完毕后务必关闭会话

会话*不是*线程安全的:您可以在线程之间共享主 Driver 对象,但请确保每个线程创建自己的会话。

运行托管事务

一个事务可以包含任意数量的查询。由于 Neo4j 符合 ACID事务中的查询要么整体执行,要么完全不执行:您无法让事务的一部分成功而另一部分失败。使用事务将相关查询组合在一起,这些查询协同工作以实现单个逻辑数据库操作。

托管事务通过方法Session.execute_read()Session.execute_write()创建,具体取决于您是想从数据库中检索数据还是修改数据。这两种方法都接受一个事务函数回调,该回调负责实际执行查询和处理结果。

检索名字以 Al 开头的人。
def match_person_nodes(tx, name_filter): (3)
    result = tx.run(""" (4)
        MATCH (p:Person) WHERE p.name STARTS WITH $filter
        RETURN p.name AS name ORDER BY name
        """, filter=name_filter)
    return list(result)  # return a list of Record objects (5)

with driver.session(database="neo4j") as session:  (1)
    people = session.execute_read(  (2)
        match_person_nodes,
        "Al",
    )
    for person in people:
        print(person.data())  # obtain dict representation
1 创建会话。单个会话可以包含多个查询。除非使用 with 结构创建,否则请在使用完毕后记得关闭它。
2 .execute_read()(或 .execute_write())方法是事务的入口点。它接受一个指向事务函数的回调以及任意数量的位置参数和关键字参数,这些参数将传递给事务函数。
3 事务函数回调负责运行查询。
4 使用方法Transaction.run()来运行查询。每次查询运行都会返回一个Result对象。
5 使用 Result 上的任何方法处理结果

不要将参数直接硬编码或连接到查询中。出于性能和安全原因,请改用查询参数

事务函数不应直接返回 Result 对象。相反,请务必以某种方式处理结果;至少,将其转换为列表。在事务函数中,return 语句会导致事务被提交,而如果抛出异常,事务则会自动回滚。

.execute_read().execute_write() 方法已取代 .read_transaction().write_transaction(),后者在 5.x 版本中已弃用,并将在 6.0 版本中移除。
一个包含多个查询、客户端逻辑和潜在回滚的事务
from neo4j import GraphDatabase


URI = "{neo4j-database-uri}"
AUTH = ("{neo4j-username}", "{neo4j-password}")
employee_threshold=10


def main():
    with GraphDatabase.driver(URI, auth=AUTH) as driver:
        with driver.session(database="neo4j") as session:
            for i in range(100):
                name = f"Thor{i}"
                org_id = session.execute_write(employ_person_tx, name)
                print(f"User {name} added to organization {org_id}")


def employ_person_tx(tx, name):
    # Create new Person node with given name, if not exists already
    result = tx.run("""
        MERGE (p:Person {name: $name})
        RETURN p.name AS name
        """, name=name
    )

    # Obtain most recent organization ID and the number of people linked to it
    result = tx.run("""
        MATCH (o:Organization)
        RETURN o.id AS id, COUNT{(p:Person)-[r:WORKS_FOR]->(o)} AS employees_n
        ORDER BY o.created_date DESC
        LIMIT 1
    """)
    org = result.single()

    if org is not None and org["employees_n"] == 0:
        raise Exception("Most recent organization is empty.")
        # Transaction will roll back -> not even Person is created!

    # If org does not have too many employees, add this Person to that
    if org is not None and org.get("employees_n") < employee_threshold:
        result = tx.run("""
            MATCH (o:Organization {id: $org_id})
            MATCH (p:Person {name: $name})
            MERGE (p)-[r:WORKS_FOR]->(o)
            RETURN $org_id AS id
            """, org_id=org["id"], name=name
        )

    # Otherwise, create a new Organization and link Person to it
    else:
        result = tx.run("""
            MATCH (p:Person {name: $name})
            CREATE (o:Organization {id: randomuuid(), created_date: datetime()})
            MERGE (p)-[r:WORKS_FOR]->(o)
            RETURN o.id AS id
            """, name=name
        )

    # Return the Organization ID to which the new Person ends up in
    return result.single()["id"]


if __name__ == "__main__":
    main()

如果事务因驱动程序认为的暂时性原因而失败,它会自动重试运行事务函数(并以指数级增长的延迟)。因此,事务函数必须是幂等(即,它们在多次运行时应产生相同的效果),因为您无法预先知道它们将执行多少次。实际上,这意味着您不应该编辑或依赖全局变量等。请注意,尽管事务函数可能会执行多次,但其中的查询将始终只运行一次。

一个会话可以链接多个事务,但在任何给定时间,一个会话中只能有一个活跃事务。要维护多个并发事务,请使用多个并发会话。

事务函数配置

装饰器unit_of_work()允许对事务函数施加进一步的控制。它允许指定

  • 事务超时(秒)。运行时间过长的事务将被服务器终止。默认值在服务器端设置。最小值为一毫秒(0.001)。

  • 一个附加到事务的元数据字典。这些元数据会被记录在服务器的 query.log 中,并可在 SHOW TRANSACTIONS Cypher 命令的输出中看到。用此来标记事务。

from neo4j import unit_of_work

@unit_of_work(timeout=5, metadata={"app_name": "people_tracker"})
def count_people(tx):
    result = tx.run("MATCH (a:Person) RETURN count(a) AS people")
    record = result.single()
    return record["people"]


with driver.session(database="neo4j") as session:
    people_n = session.execute_read(count_people)

运行显式事务

您可以通过使用方法Session.begin_transaction()手动开始事务来完全控制事务。然后,您可以使用方法Transaction.run()在显式事务中运行查询。

with driver.session(database="neo4j") as session:
    with session.begin_transaction() as tx:
        # use tx.run() to run queries and tx.commit() when done
        tx.run("<QUERY 1>")
        tx.run("<QUERY 2>")

        tx.commit()

显式事务可以使用Transaction.commit()提交,或使用Transaction.rollback()回滚。如果没有采取显式操作,驱动程序将在事务生命周期结束时自动回滚事务。

显式事务对于需要在同一事务中跨多个函数分发 Cypher 执行的应用程序,或需要在一个事务中运行多个查询但不需要托管事务提供的自动重试功能的应用程序最有用。

一个涉及外部 API 的显式事务示例
import neo4j


URI = "{neo4j-database-uri}"
AUTH = ("{neo4j-username}", "{neo4j-password}")


def main():
    with neo4j.GraphDatabase.driver(URI, auth=AUTH) as driver:
        customer_id = create_customer(driver)
        other_bank_id = 42
        transfer_to_other_bank(driver, customer_id, other_bank_id, 999)


def create_customer(driver):
    result, _, _ = driver.execute_query("""
        MERGE (c:Customer {id: rand()})
        RETURN c.id AS id
    """, database_ = "neo4j")
    return result[0]["id"]


def transfer_to_other_bank(driver, customer_id, other_bank_id, amount):
    with driver.session(database="neo4j") as session:
        with session.begin_transaction() as tx:
            if not customer_balance_check(tx, customer_id, amount):
                # give up
                return

            other_bank_transfer_api(customer_id, other_bank_id, amount)
            # Now the money has been transferred => can't rollback anymore
            # (cannot rollback external services interactions)

            try:
                decrease_customer_balance(tx, customer_id, amount)
                tx.commit()
            except Exception as e:
                request_inspection(customer_id, other_bank_id, amount, e)
                raise  # roll back


def customer_balance_check(tx, customer_id, amount):
    query = ("""
        MATCH (c:Customer {id: $id})
        RETURN c.balance >= $amount AS sufficient
    """)
    result = tx.run(query, id=customer_id, amount=amount)
    record = result.single(strict=True)
    return record["sufficient"]


def other_bank_transfer_api(customer_id, other_bank_id, amount):
    # make some API call to other bank
    pass


def decrease_customer_balance(tx, customer_id, amount):
    query = ("""
        MATCH (c:Customer {id: $id})
        SET c.balance = c.balance - $amount
    """)
    result = tx.run(query, id=customer_id, amount=amount)
    result.consume()


def request_inspection(customer_id, other_bank_id, amount, e):
    # manual cleanup required; log this or similar
    print("WARNING: transaction rolled back due to exception:", repr(e))
    print("customer_id:", customer_id, "other_bank_id:", other_bank_id,
          "amount:", amount)


if __name__ == "__main__":
    main()

处理查询结果

驱动程序的查询输出是一个Result对象,它将 Cypher 结果封装在一个丰富的数据结构中,这需要在客户端进行一些解析。需要注意的两个主要点是:

  • 结果记录不是立即且完全由服务器获取和返回的。相反,结果以惰性流的形式返回。特别是,当驱动程序从服务器接收到一些记录时,它们最初会在后台队列中缓冲。记录保留在缓冲区中,直到它们被应用程序消费,此时它们将从缓冲区中移除。当没有更多记录可用时,结果将耗尽

  • 结果充当游标。这意味着无法从流中检索先前的记录,除非您将其保存在辅助数据结构中。

下面的动画展示了单个查询的路径:它展示了驱动程序如何处理结果记录以及应用程序应如何处理结果。

处理结果最简单的方法是将其转换为列表,这会生成一个Record对象列表。否则,Result 对象实现了许多用于处理记录的方法。最常用的一些方法如下。

名称 描述

value(key=0, default=None)

将结果的其余部分作为列表返回。如果指定了 key,则只包含给定属性,而 default 允许为缺少该属性的节点指定一个值。

fetch(n)

从结果中返回最多 n 条记录。

single(strict=False)

返回下一个也是唯一一个剩余的记录,或 None。调用此方法总是会耗尽结果。

如果有一个以上(或不足一个)记录可用,

  • strict==False — 生成警告并返回这些记录中的第一个(如果有);

  • strict==True — 抛出 ResultNotSingleError 异常。

peek()

从结果中返回下一条记录而不消费它。这会将记录留在缓冲区中以供进一步处理。

data(*keys)

返回原始结果的 JSON 格式转储。仅用于调试/原型设计目的。

consume()

返回查询结果摘要。它会耗尽结果,因此只有在数据处理完成后才应调用。

graph()

将结果转换为图对象集合。请参阅转换为图

to_df(expand, parse_dates)

将结果转换为 Pandas Dataframe。请参阅转换为 Pandas Dataframe

有关 Result 方法的完整列表,请参阅API 文档 — Result

会话配置

数据库选择

您应该始终使用 database 参数明确指定数据库,即使在单数据库实例上也是如此。这允许驱动程序更高效地工作,因为它节省了到服务器解析主数据库的网络往返时间。如果没有给定数据库,则使用 Neo4j 实例设置中设置的默认数据库

with driver.session(
    database="neo4j"
) as session:
    ...
通过配置方法指定数据库优于使用 USE Cypher 子句。如果服务器运行在集群上,带有 USE 的查询需要启用服务器端路由。查询执行时间也可能更长,因为它们可能无法在第一次尝试时到达正确的集群成员,并且需要路由到包含所请求数据库的集群成员。

请求路由

在集群环境中,所有会话都以写入模式打开,并路由到领导者。您可以通过将 default_access_mode 参数明确设置为 neo4j.READ_ACCESSneo4j.WRITE_ACCESS 来更改此设置。请注意,.execute_read().execute_write() 会自动覆盖会话的默认访问模式。

import neo4j

with driver.session(
    database="neo4j",
    default_access_mode=neo4j.READ_ACCESS
) as session:
    ...

尽管在读取模式下执行写入查询很可能会导致运行时错误,但您不应依赖此进行访问控制。两种模式的区别在于,读取事务被路由到集群的任何节点,而写入事务则定向到主节点。换句话说,无法保证在读取模式下提交的写入查询会被拒绝。

类似说明也适用于 .executeRead().executeWrite() 方法。

以不同用户身份运行查询

您可以使用 auth 参数通过不同用户执行查询。在会话级别切换用户比创建新的 Driver 对象更便宜。查询随后在给定用户的安全上下文中运行(即主数据库、权限等)。
会话范围的身份验证需要服务器版本 >= 5.8。

with driver.session(
    database="neo4j",
    auth=("somebody_else", "their_password")
) as session:
    ...

参数 impersonated_user 提供了类似的功能,并且在驱动程序/服务器版本 >= 4.4 中可用。区别在于您不需要知道用户的密码即可模拟他们,但创建 Driver 的用户需要拥有相应的权限

with driver.session(
    database="neo4j",
    impersonated_user="somebody_else"
) as session:
    ...

关闭会话

每个连接池都有有限数量的会话,因此如果您打开会话而不关闭它们,您的应用程序可能会耗尽会话。因此,建议使用 with 语句创建会话,这样在应用程序使用完毕后会自动关闭它们。当会话关闭时,它会被返回到连接池以便后续重用。

如果您不使用 with,请在使用完会话后记得调用 .close() 方法。

session = driver.session(database="neo4j")

# session usage

session.close()

术语表

LTS

长期支持版本是指保证在数年内获得支持的版本。Neo4j 4.4 是 LTS 版本,Neo4j 5 也将有 LTS 版本。

Aura

Aura 是 Neo4j 的全托管云服务。它提供免费和付费计划。

Cypher

Cypher 是 Neo4j 的图查询语言,可让您从数据库中检索数据。它类似于 SQL,但适用于图。

APOC

Awesome Procedures On Cypher (APOC) 是一个包含许多无法在 Cypher 本身轻松表达的函数的库。

Bolt

Bolt 是用于 Neo4j 实例和驱动程序之间交互的协议。它默认监听端口 7687。

ACID

原子性、一致性、隔离性、持久性 (ACID) 是保证数据库事务可靠处理的属性。符合 ACID 的 DBMS 确保数据库中的数据即使在发生故障时也保持准确和一致。

最终一致性

如果数据库保证所有集群成员将在某个时间点存储最新版本的数据,则该数据库是最终一致的。

因果一致性

如果读写查询按相同顺序被集群的每个成员看到,则数据库是因果一致的。这比最终一致性更强。

NULL

空标记不是一种类型,而是缺少值的占位符。更多信息,请参阅Cypher → 处理 null

事务

事务是一个工作单元,要么整体提交,要么在失败时回滚。一个例子是银行转账:它涉及多个步骤,但这些步骤必须全部成功或被撤销,以避免资金从一个账户中扣除但未添加到另一个账户的情况。

背压

背压是一种阻碍数据流动的力量。它确保客户端不会被数据淹没,即数据传输速度不会超过客户端的处理速度。

事务函数

事务函数是由 execute_readexecute_write 调用执行的回调。在服务器故障时,驱动程序会自动重新执行该回调。

Driver

Driver 对象包含建立与 Neo4j 数据库连接所需的详细信息。

© . All rights reserved.