协调并行事务

在使用 Neo4j 集群时,因果一致性 在大多数情况下默认情况下是强制执行的,这保证查询能够读取之前查询所做的更改。 然而,默认情况下,对于并行运行的多个事务,情况并非如此。 在这种情况下,您可以使用书签让一个事务在运行自己的工作之前等待另一个事务的结果传播到整个集群。 这不是一项要求,并且您应该只在需要不同事务之间的因果一致性时使用书签,因为等待书签会对性能产生负面影响。

书签是一个代表数据库某个状态的令牌。 通过将一个或多个书签与查询一起传递,服务器将确保查询在所代表的状态(s) 建立之前不会被执行。

.execute_query() 的书签

使用 .execute_query() 查询数据库时,驱动程序会为您管理书签。 在这种情况下,您可以保证后续查询能够读取之前的更改,无需采取进一步的措施。

driver.execute_query("<QUERY 1>")

# subsequent execute_query calls will be causally chained

driver.execute_query("<QUERY 2>") # can read result of <QUERY 1>
driver.execute_query("<QUERY 3>") # can read result of <QUERY 2>

要禁用书签管理和因果一致性,请在 .execute_query() 调用中将 bookmark_manager_=None 设置为 None

driver.execute_query(
    "<QUERY>",
    bookmark_manager_=None,
)

单个会话内的书签

书签管理在单个会话内运行的查询中自动发生,因此您可以相信一个会话内的查询是因果链式的。

with driver.session() as session:
    session.execute_write(lambda tx: tx.run("<QUERY 1>"))
    session.execute_write(lambda tx: tx.run("<QUERY 2>"))  # can read QUERY 1
    session.execute_write(lambda tx: tx.run("<QUERY 3>"))  # can read QUERY 1,2

跨多个会话的书签

如果您的应用程序使用多个会话,您可能需要确保一个会话已完成其所有事务,然后才能允许另一个会话运行其查询。

在下面的示例中,session_asession_b 可以同时运行,而 session_c 则等待它们的结果传播。 这保证了 session_c 要操作的 Person 节点确实存在。

使用书签协调多个会话
from neo4j import GraphDatabase, Bookmarks


URI = "<URI for Neo4j database>"
AUTH = ("<Username>", "<Password>")

def main():
    with GraphDatabase.driver(URI, auth=AUTH) as driver:
        driver.verify_connectivity()
        create_some_friends(driver)


def create_some_friends(driver):
    saved_bookmarks = Bookmarks()  # To collect the sessions' bookmarks

    # Create the first person and employment relationship
    with driver.session(database="neo4j") as session_a:
        session_a.execute_write(create_person, "Alice")
        session_a.execute_write(employ, "Alice", "Wayne Enterprises")
        saved_bookmarks += session_a.last_bookmarks()  (1)

    # Create the second person and employment relationship
    with driver.session(database="neo4j") as session_b:
        session_b.execute_write(create_person, "Bob")
        session_b.execute_write(employ, "Bob", "LexCorp")
        saved_bookmarks += session_b.last_bookmarks()  (1)

    # Create a friendship between the two people created above
    with driver.session(
        database="neo4j", bookmarks=saved_bookmarks
    ) as session_c:  (2)
        session_c.execute_write(create_friendship, "Alice", "Bob")
        session_c.execute_read(print_friendships)


# Create a person node
def create_person(tx, name):
    tx.run("MERGE (:Person {name: $name})", name=name)


# Create an employment relationship to a pre-existing company node
# This relies on the person first having been created.
def employ(tx, person_name, company_name):
    tx.run("""
        MATCH (person:Person {name: $person_name})
        MATCH (company:Company {name: $company_name})
        CREATE (person)-[:WORKS_FOR]->(company)
        """, person_name=person_name, company_name=company_name
    )


# Create a friendship between two people
def create_friendship(tx, name_a, name_b):
    tx.run("""
        MATCH (a:Person {name: $name_a})
        MATCH (b:Person {name: $name_b})
        MERGE (a)-[:KNOWS]->(b)
        """, name_a=name_a, name_b=name_b
    )


# Retrieve and display all friendships
def print_friendships(tx):
    result = tx.run("MATCH (a)-[:KNOWS]->(b) RETURN a.name, b.name")
    for record in result:
        print("{} knows {}".format(record["a.name"], record["b.name"]))


if __name__ == "__main__":
    main()
1 使用 Session.last_bookmarks() 收集和组合来自不同会话的书签,将它们存储在 Bookmarks 对象中。
2 使用它们使用 bookmarks 参数初始化另一个会话。

driver passing bookmarks

使用书签可能会对性能产生负面影响,因为所有查询都必须等待最新的更改传播到整个集群。 对于简单的用例,请尝试将查询分组到单个事务中,或分组到单个会话中。

混合 .execute_query() 和会话

为了确保使用 .execute_query() 部分执行的事务和使用会话部分执行的事务之间的因果一致性,您可以在创建会话时使用参数 bookmark_manager,将其设置为 driver.execute_query_bookmark_manager。 由于这是 .execute_query() 调用默认的书签管理器,因此这将确保所有工作都在同一个书签管理器下执行,从而保证因果一致性。

driver.execute_query("<QUERY 1>")

with driver.session(
    bookmark_manager=driver.execute_query_bookmark_manager
) as session:
    # every query inside this session will be causally chained
    # (i.e., can read what was written by <QUERY 1>)
    session.execute_write(lambda tx: tx.run("<QUERY 2>"))

# subsequent execute_query calls will be causally chained
# (i.e., can read what was written by <QUERY 2>)
driver.execute_query("<QUERY 3>")

实现自定义 BookmarkManager

书签管理器是驱动程序用于跟踪书签和保持会话自动一致的接口。

您可以对BookmarkManager 接口进行子类化以实现自定义书签管理器,或者使用驱动程序通过GraphDatabase.bookmark_manager() 提供的默认实现。 在实现书签管理器时,请记住所有方法都必须是并发安全的。

有关接口的详细信息,请参阅API 文档

词汇表

LTS

长期支持版本是指保证支持数年时间的版本。 Neo4j 4.4 是 LTS 版本,Neo4j 5 也将具有 LTS 版本。

Aura

Aura 是 Neo4j 的全托管云服务。 它提供免费和付费计划。

Cypher

Cypher 是 Neo4j 的图查询语言,它允许您从数据库中检索数据。 它类似于 SQL,但适用于图。

APOC

Cypher 上的绝佳程序 (APOC) 是一个包含(许多)函数的库,这些函数无法轻松地用 Cypher 本身表达。

Bolt

Bolt 是 Neo4j 实例和驱动程序之间交互所使用的协议。 它默认监听端口 7687。

ACID

原子性、一致性、隔离性和持久性 (ACID) 是保证数据库事务可靠处理的属性。 符合 ACID 的 DBMS 确保即使在发生故障的情况下,数据库中的数据仍然准确且一致。

最终一致性

如果数据库保证所有集群成员在某个时间点都会存储数据的最新版本,则该数据库是最终一致的。

因果一致性

如果数据库保证所有集群成员以相同的顺序看到读写查询,则该数据库是因果一致的。 这比最终一致性更强。

NULL

空标记不是一种类型,而是一个表示没有值的占位符。 有关更多信息,请参阅Cypher → 使用 null

事务

事务是一个工作单元,要么完全提交,要么在发生故障时回滚。 例如,银行转账:它包含多个步骤,但它们必须全部成功或被还原,以避免从一个账户中减去资金,而没有将其添加到另一个账户中。

反压

反压是一种与数据流相反的力。 它确保客户端不会被比其能够处理的速度快的数据淹没。

事务函数

事务函数是 execute_readexecute_write 调用执行的回调函数。 驱动程序会在发生服务器故障时自动重新执行回调函数。

驱动程序

Driver 对象包含建立与 Neo4j 数据库连接所需的详细信息。