协调并行事务

在使用 Neo4j 集群时,在大多数情况下默认强制执行因果一致性,这保证了查询能够读取先前查询所做的更改。然而,对于多个并行运行的事务,默认情况下不会发生这种情况。在这种情况下,您可以使用书签让一个事务等待另一个事务的结果在集群中传播后再运行其自身的工作。这不是强制要求,并且您只有在需要不同事务之间的因果一致性时才应使用书签,因为等待书签可能会对性能产生负面影响。

书签是表示数据库某种状态的令牌。通过在查询中传递一个或多个书签,服务器将确保查询不会在所代表的状态建立之前执行。

.execute_query() 的书签

使用 .execute_query() 查询数据库时,驱动程序会为您管理书签。在这种情况下,您可以保证后续查询能够读取先前的更改,而无需采取进一步操作。

driver.execute_query("<QUERY 1>")

# subsequent execute_query calls will be causally chained

driver.execute_query("<QUERY 2>") # can read result of <QUERY 1>
driver.execute_query("<QUERY 3>") # can read result of <QUERY 2>

要禁用书签管理和因果一致性,请在 .execute_query() 调用中设置 bookmark_manager_=None

driver.execute_query(
    "<QUERY>",
    bookmark_manager_=None,
)

单一会话中的书签

对于在单一会话中运行的查询,书签管理会自动进行,因此您可以相信一个会话内的查询是因果关联的。

with driver.session() as session:
    session.execute_write(lambda tx: tx.run("<QUERY 1>"))
    session.execute_write(lambda tx: tx.run("<QUERY 2>"))  # can read QUERY 1
    session.execute_write(lambda tx: tx.run("<QUERY 3>"))  # can read QUERY 1,2

跨多个会话的书签

如果您的应用程序使用多个会话,您可能需要确保一个会话已完成其所有事务,然后才允许另一个会话运行其查询。

在下面的示例中,session_asession_b 允许并发运行,而 session_c 则等待它们的结果传播完成后再执行。这保证了 session_c 想要操作的 Person 节点确实存在。

使用书签协调多个会话
from neo4j import GraphDatabase, Bookmarks


URI = "{neo4j-database-uri}"
AUTH = ("{neo4j-username}", "{neo4j-password}")

def main():
    with GraphDatabase.driver(URI, auth=AUTH) as driver:
        driver.verify_connectivity()
        create_some_friends(driver)


def create_some_friends(driver):
    saved_bookmarks = Bookmarks()  # To collect the sessions' bookmarks

    # Create the first person and employment relationship
    with driver.session(database="neo4j") as session_a:
        session_a.execute_write(create_person, "Alice")
        session_a.execute_write(employ, "Alice", "Wayne Enterprises")
        saved_bookmarks += session_a.last_bookmarks()  (1)

    # Create the second person and employment relationship
    with driver.session(database="neo4j") as session_b:
        session_b.execute_write(create_person, "Bob")
        session_b.execute_write(employ, "Bob", "LexCorp")
        saved_bookmarks += session_b.last_bookmarks()  (1)

    # Create a friendship between the two people created above
    with driver.session(
        database="neo4j", bookmarks=saved_bookmarks
    ) as session_c:  (2)
        session_c.execute_write(create_friendship, "Alice", "Bob")
        session_c.execute_read(print_friendships)


# Create a person node
def create_person(tx, name):
    tx.run("MERGE (:Person {name: $name})", name=name)


# Create an employment relationship to a pre-existing company node
# This relies on the person first having been created.
def employ(tx, person_name, company_name):
    tx.run("""
        MATCH (person:Person {name: $person_name})
        MATCH (company:Company {name: $company_name})
        CREATE (person)-[:WORKS_FOR]->(company)
        """, person_name=person_name, company_name=company_name
    )


# Create a friendship between two people
def create_friendship(tx, name_a, name_b):
    tx.run("""
        MATCH (a:Person {name: $name_a})
        MATCH (b:Person {name: $name_b})
        MERGE (a)-[:KNOWS]->(b)
        """, name_a=name_a, name_b=name_b
    )


# Retrieve and display all friendships
def print_friendships(tx):
    result = tx.run("MATCH (a)-[:KNOWS]->(b) RETURN a.name, b.name")
    for record in result:
        print("{} knows {}".format(record["a.name"], record["b.name"]))


if __name__ == "__main__":
    main()
1 使用 Session.last_bookmarks() 收集并合并来自不同会话的书签,将其存储在 Bookmarks 对象中。
2 使用它们通过 bookmarks 参数初始化另一个会话。

driver passing bookmarks

书签的使用可能会对性能产生负面影响,因为所有查询都将被强制等待最新更改在集群中传播。对于简单的用例,请尝试将查询分组到单个事务中,或单个会话中。

混合使用 .execute_query() 和会话

为了确保部分使用 .execute_query() 且部分使用会话执行的事务之间的因果一致性,您可以在会话创建时使用参数 bookmark_manager,并将其设置为 driver.execute_query_bookmark_manager。由于这是 .execute_query() 调用的默认书签管理器,这将确保所有工作都在相同的书签管理器下执行,从而实现因果一致性。

driver.execute_query("<QUERY 1>")

with driver.session(
    bookmark_manager=driver.execute_query_bookmark_manager
) as session:
    # every query inside this session will be causally chained
    # (i.e., can read what was written by <QUERY 1>)
    session.execute_write(lambda tx: tx.run("<QUERY 2>"))

# subsequent execute_query calls will be causally chained
# (i.e., can read what was written by <QUERY 2>)
driver.execute_query("<QUERY 3>")

实现自定义 BookmarkManager

书签管理器是驱动程序用于跟踪书签并自动保持会话一致性的接口。

您可以继承 BookmarkManager 接口来实现自定义书签管理器,或者使用驱动程序通过 GraphDatabase.bookmark_manager() 提供的默认实现。在实现书签管理器时,请记住所有方法都必须是并发安全的。

接口的详细信息可在API 文档中找到。

术语表

LTS

长期支持 (LTS) 版本是保证支持多年的版本。Neo4j 4.4 是 LTS 版本,Neo4j 5 也将有 LTS 版本。

Aura

Aura 是 Neo4j 的全托管云服务。它提供免费和付费计划。

Cypher

Cypher 是 Neo4j 的图查询语言,可让您从数据库中检索数据。它类似于 SQL,但适用于图。

APOC

Awesome Procedures On Cypher (APOC) 是一个包含许多无法在 Cypher 本身中轻松表达的函数的库。

Bolt

Bolt 是 Neo4j 实例和驱动程序之间交互所使用的协议。它默认监听端口 7687。

ACID

原子性(Atomicity)、一致性(Consistency)、隔离性(Isolation)、持久性(Durability) (ACID) 是保证数据库事务可靠处理的属性。符合 ACID 的 DBMS 确保数据库中的数据在发生故障时仍保持准确和一致。

最终一致性

如果数据库能够保证所有集群成员在某个时间点都将存储数据的最新版本,则该数据库是最终一致的。

因果一致性

如果读写查询在集群的每个成员上都以相同的顺序可见,则数据库是因果一致的。这比最终一致性更强。

NULL

空标记不是一种类型,而是表示值缺失的占位符。有关更多信息,请参阅Cypher → 使用 null

事务

事务是一个工作单元,它要么整体提交,要么在失败时回滚。一个例子是银行转账:它涉及多个步骤,但它们必须全部成功或被撤销,以避免资金从一个账户中扣除但未添加到另一个账户的情况。

反压

反压是阻碍数据流动的力。它确保客户端不会被数据淹没,速度快于其处理能力。

事务函数

事务函数是 execute_readexecute_write 调用执行的回调。在服务器故障时,驱动程序会自动重新执行该回调。

驱动程序

一个 Driver 对象包含建立与 Neo4j 数据库连接所需的详细信息。