Cypher 工作流程

本节介绍如何创建工作单元并为该工作提供逻辑上下文。

概述

Neo4j 驱动程序公开了一个 Cypher 通道,数据库工作可以通过该通道进行(有关 Cypher 查询语言的更多信息,请参阅 Cypher 手册)。

工作本身组织为会话事务查询,如下所示

sessions queries transactions
图 1. 会话、查询和事务

会话始终绑定到单个事务上下文,这通常是一个单独的数据库。

使用书签机制,会话还能确保事务排序的正确性,即使事务发生在多个集群成员上也是如此。这种效果被称为因果链

会话

会话是用于因果链式事务序列的轻量级容器(参见操作手册 → 因果一致性)。它们本质上提供了以书签形式存储事务排序信息的上下文。

当事务开始时,包含该事务的会话会从驱动程序连接池中获取一个连接。事务提交(或回滚)后,会话会再次释放该连接。这意味着只有当会话执行工作时,它才会占用连接资源。空闲时,不使用此类资源。

由于会话保证了排序,一个会话一次只能承载一个事务。对于并行执行,应使用多个会话。在线程安全存在问题的语言中,不应将会话视为线程安全的

关闭会话会强制任何打开的事务回滚,并且其关联的连接随后会被释放回连接池。

会话绑定到单个事务上下文,并在构建时指定。Neo4j 将每个数据库在其自己的上下文中公开,从而在设计上禁止跨数据库事务(或会话)。同样,绑定到不同数据库的会话不能通过在它们之间传播书签来因果链式连接

各个语言驱动程序提供了多种会话类,每种都围绕特定的编程风格。每个会话类都提供了一组类似的功能,但根据应用程序的结构以及是否使用了框架,为客户端应用程序提供了选择。

会话类在会话 API 中描述。更多详细信息,请参阅API 文档

事务

事务是包含一个或多个 Cypher 查询原子工作单元。事务可能包含读取或写入工作,通常会被路由到适当的服务器执行,并在那里完整执行。如果事务失败,需要从头开始重试事务。这是事务管理器的职责。

Neo4j 驱动程序通过事务函数机制提供事务管理。此机制通过 Session 对象上的方法公开,这些方法接受一个函数对象,该对象可以在不同服务器上多次执行,直到成功或达到超时。此方法推荐用于大多数客户端应用程序。

一个方便的简短替代方案自动提交事务机制。它为单查询事务提供了一种有限形式的事务管理,以代码开销稍小为代价。这种事务形式适用于快速脚本和不需要高可用性保证的环境。它也是运行Cypher 手册 → CALL {} IN TRANSACTIONS 查询所需的事务形式,这是唯一一种管理自身事务的 Cypher 查询类型。

此外,还提供了一个更低级的非托管事务 API,用于高级用例。当客户端应用替代事务管理层,需要以自定义方式管理错误处理和重试时,此 API 会很有用。

要了解有关如何使用非托管事务的更多信息,请参阅API 文档

查询和结果

查询包括向服务器请求执行 Cypher 语句,然后将结果响应回客户端。结果以记录流的形式传输,并附带头部和尾部元数据,客户端应用程序可以增量消费。凭借响应式能力,记录流的语义可以通过允许 Cypher 结果在执行过程中暂停或取消来增强。

要执行 Cypher 查询,需要查询文本以及一组可选的命名参数。文本可以包含在运行时替换为相应值的参数占位符。虽然可以运行非参数化 Cypher 查询,但良好的编程实践是尽可能在 Cypher 查询中使用参数。这允许在 Cypher 引擎中缓存查询,这有利于性能。参数值应符合Cypher 值

通常还会提供结果摘要。这包含与查询执行和结果内容相关的附加信息。对于 EXPLAINPROFILE 查询,这里会返回查询计划。有关这些查询的更多信息,请参阅Cypher 手册 → 分析查询

因果链和书签

在使用因果集群时,事务可以通过会话进行链式连接,以确保因果一致性。这意味着对于任意两个事务,可以保证第二个事务仅在第一个事务成功提交后才开始。即使事务在不同的物理集群成员上执行,这也成立。有关因果集群的更多信息,请参阅操作手册 → 集群

在内部,因果链通过在事务之间传递书签来实现。每个书签记录特定数据库事务历史中的一个或多个点,可用于通知集群成员按特定顺序执行工作单元。收到书签后,服务器将阻塞,直到它追赶上相关事务时间点。

在开始新事务时,初始书签从客户端发送到服务器,并在成功完成后返回最终书签。请注意,这适用于读取和写入事务。

书签传播在会话内自动进行,不需要应用程序的任何明确信号或设置。要退出此机制以处理不相关的工作单元,应用程序可以使用多个会话。这避免了因果链的小延迟开销。

可以通过从一个会话中提取最后一个书签并将其传递给另一个会话的构造函数,从而在会话之间传递书签。如果一个事务有多个逻辑前驱,也可以合并多个书签。请注意,只有在跨会话链式连接时,应用程序才需要直接使用书签。

driver passing bookmarks
图 2. 传递书签
示例 1. 传递书签
// Create a company node
private IResultSummary AddCompany(IQueryRunner tx, string name)
{
    return tx.Run("CREATE (a:Company {name: $name})", new { name }).Consume();
}

// Create a person node
private IResultSummary AddPerson(IQueryRunner tx, string name)
{
    return tx.Run("CREATE (a:Person {name: $name})", new { name }).Consume();
}

// Create an employment relationship to a pre-existing company node.
// This relies on the person first having been created.
private IResultSummary Employ(IQueryRunner tx, string personName, string companyName)
{
    return tx.Run(
            @"MATCH (person:Person {name: $personName}) 
                 MATCH (company:Company {name: $companyName}) 
                 CREATE (person)-[:WORKS_FOR]->(company)",
            new { personName, companyName })
        .Consume();
}

// Create a friendship between two people.
private IResultSummary MakeFriends(IQueryRunner tx, string name1, string name2)
{
    return tx.Run(
            @"MATCH (a:Person {name: $name1}) 
                 MATCH (b:Person {name: $name2})
                 MERGE (a)-[:KNOWS]->(b)",
            new { name1, name2 })
        .Consume();
}

// Match and display all friendships.
private int PrintFriendships(IQueryRunner tx)
{
    var result = tx.Run("MATCH (a)-[:KNOWS]->(b) RETURN a.name, b.name");

    var count = 0;
    foreach (var record in result)
    {
        count++;
        Console.WriteLine($"{record["a.name"]} knows {record["b.name"]}");
    }

    return count;
}

public void AddEmployAndMakeFriends()
{
    // To collect the session bookmarks
    var savedBookmarks = new List<Bookmarks>();

    // Create the first person and employment relationship.
    using (var session1 = Driver.Session(o => o.WithDefaultAccessMode(AccessMode.Write)))
    {
        session1.ExecuteWrite(tx => AddCompany(tx, "Wayne Enterprises"));
        session1.ExecuteWrite(tx => AddPerson(tx, "Alice"));
        session1.ExecuteWrite(tx => Employ(tx, "Alice", "Wayne Enterprises"));

        savedBookmarks.Add(session1.LastBookmarks);
    }

    // Create the second person and employment relationship.
    using (var session2 = Driver.Session(o => o.WithDefaultAccessMode(AccessMode.Write)))
    {
        session2.ExecuteWrite(tx => AddCompany(tx, "LexCorp"));
        session2.ExecuteWrite(tx => AddPerson(tx, "Bob"));
        session2.ExecuteWrite(tx => Employ(tx, "Bob", "LexCorp"));

        savedBookmarks.Add(session2.LastBookmarks);
    }

    // Create a friendship between the two people created above.
    using (var session3 = Driver.Session(
               o =>
                   o.WithDefaultAccessMode(AccessMode.Write).WithBookmarks(savedBookmarks.ToArray())))
    {
        session3.ExecuteWrite(tx => MakeFriends(tx, "Alice", "Bob"));

        session3.ExecuteRead(PrintFriendships);
    }
}

使用访问模式路由事务

事务可以以模式执行;这被称为访问模式。在因果集群中,每个事务将根据其模式路由到适当的服务器。当使用单个实例时,所有事务都将传递到该服务器。

通过识别读写来路由 Cypher 可以提高可用集群资源的利用率。由于读服务器通常比写服务器更充足,因此将读流量导向读服务器而不是写服务器是有益的。这样做有助于保持写服务器可用于写事务。

访问模式通常由调用事务函数的方法指定。会话类提供了一个用于调用读取的方法和另一个用于写入的方法。

作为自动提交非托管事务的备用方案,还可以在会话级别提供默认访问模式。这仅在无法通过其他方式指定访问模式的情况下使用。如果在该会话中使用事务函数,则默认访问模式将被覆盖

驱动程序不解析 Cypher,因此无法自动确定事务是用于执行读取还是写入操作。因此,标记为读取的写入事务仍将发送到读取服务器,但在执行时将失败。
示例 2. 读写事务
public long AddPerson(string name)
{
    using var session = Driver.Session();
    session.ExecuteWrite(tx => CreatePersonNode(tx, name));
    return session.ExecuteRead(tx => MatchPersonNode(tx, name));
}

private static IResultSummary CreatePersonNode(IQueryRunner tx, string name)
{
    return tx.Run("CREATE (a:Person {name: $name})", new { name }).Consume();
}

private static long MatchPersonNode(IQueryRunner tx, string name)
{
    var result = tx.Run("MATCH (a:Person {name: $name}) RETURN id(a)", new { name });
    return result.Single()[0].As<long>();
}

数据库和执行上下文

Neo4j 提供了在同一个 DBMS 中使用多个数据库的能力。

对于社区版,这限制为一个用户数据库,以及 system 数据库。

从驱动程序 API 的角度来看,会话具有 DBMS 范围,并且可以在会话构建时选择会话的默认数据库。默认数据库用作未通过 USE 子句明确指定数据库的查询的目标。有关 USE 子句的详细信息,请参阅Cypher 手册 → USE

在多数据库环境中,服务器将一个数据库标记为默认。每当创建会话时未指定特定数据库为默认时,都会选择此数据库。在单个数据库环境中,该数据库始终是默认数据库。

有关在同一 DBMS 中管理多个数据库的更多信息,请参阅Cypher 手册 → Neo4j 数据库和图,其中全面介绍了 Neo4j 数据存储层次结构。

以下示例说明了如何使用数据库

var session = driver.session(SessionConfig.forDatabase( "foo" ))
// lists nodes from database foo
session.run("MATCH (n) RETURN n").list()

// lists nodes from database bar
session.run("USE bar MATCH (n) RETURN n").list()

// creates an index in foo
session.run("CREATE INDEX foo_idx FOR (n:Person) ON n.id").consume()

// creates an index in bar
session.run("USE bar CREATE INDEX bar_idx FOR (n:Person) ON n.id").consume()

// targets System database
session.run("SHOW USERS")

数据库选择

在会话创建期间,您将数据库名称传递给驱动程序。如果您未指定名称,则使用默认数据库。数据库名称不能为 null,也不能为空字符串。

仅当驱动程序连接到 Neo4j 企业版时才可能选择数据库。在 Neo4j 社区版中更改为除默认数据库以外的任何其他数据库都将导致运行时错误。

以下示例说明了 DBMS Cypher 手册 → 事务的概念,并展示了如何在一次驱动程序事务中向多个数据库发出查询。它附有注释,描述了每个操作如何影响数据库事务。

var session = driver.session(SessionConfig.forDatabase( "foo" ))
// a DBMS-level transaction is started
var transaction = session.begin()

// a transaction on database "foo" is started automatically with the first query targeting foo
transaction.run("MATCH (n) RETURN n").list()

// a transaction on database "bar" is started
transaction.run("USE bar MATCH (n) RETURN n").list()

// executes in the transaction on database "foo"
transaction.run("RETURN 1").consume()

// executes in the transaction on database "bar"
transaction.run("USE bar RETURN 1").consume()

// commits the DBMS-level transaction which commits the transactions on databases "foo" and
// "bar"
transaction.commit()

请注意,请求的数据库必须存在。

示例 3. 在会话创建时选择数据库
using (var session = _driver.Session(SessionConfigBuilder.ForDatabase("examples")))
{
    session.Run("CREATE (a:Greeting {message: 'Hello, Example-Database'}) RETURN a").Consume();
}

void SessionConfig(SessionConfigBuilder configBuilder)
{
    configBuilder.WithDatabase("examples")
        .WithDefaultAccessMode(AccessMode.Read)
        .Build();
}

using (var session = _driver.Session(SessionConfig))
{
    var result = session.Run("MATCH (a:Greeting) RETURN a.message as msg");
    var msg = result.Single()[0].As<string>();
    Console.WriteLine(msg);
}

类型映射

驱动程序在应用程序语言类型Cypher 类型之间进行转换。

要传递参数和处理结果,了解 Cypher 如何处理类型以及 Cypher 类型如何在驱动程序中映射的基础知识非常重要。

下表显示了可用的数据类型。所有类型都可能出现在结果中,但并非所有类型都可以用作参数。

Cypher 类型 参数 结果

NULL*

LIST

MAP

BOOLEAN

INTEGER

FLOAT

STRING

ByteArray

DATE

ZONED TIME

LOCAL TIME

ZONED DATETIME

LOCAL DATETIME

DURATION

POINT

NODE**

RELATIONSHIP**

PATH**

* null 标记不是一种类型,而是值缺失的占位符。有关如何在 Cypher 中使用 null 的信息,请参阅Cypher 手册 → 使用 null

** 节点、关系和路径在结果中作为原始图实体的快照传递。虽然原始实体 ID 包含在这些快照中,但与底层服务器端实体之间没有保留永久链接,这些实体可能会独立于客户端副本被删除或以其他方式更改。图结构不能用作参数,因为它取决于应用程序上下文,即此类参数是按引用传递还是按值传递,而 Cypher 没有提供表示此点的机制。通过简单地传递按引用传递的 ID 或按值传递的属性提取映射,可以实现等效功能。

Neo4j 驱动程序将 Cypher 类型与原生语言类型进行映射,如下表所示。自定义类型(语言或标准库中不提供的类型)以粗体显示。

表 1. Neo4j 类型到 .NET 类型的映射
Neo4j Cypher 类型 .NET 类型

NULL

null

LIST

IList<object>

MAP

IDictionary<string, object>

BOOLEAN

bool

INTEGER

long

FLOAT

double

STRING

string

ByteArray

byte[]

DATE

LocalDate

ZONED TIME

OffsetTime

LOCAL TIME

LocalTime

ZONED DATETIME*

ZonedDateTime

LOCAL DATETIME

LocalDateTime

DURATION

Duration

POINT

Point

NODE

INode

RELATIONSHIP

IRelationship

PATH

IPath

* 时区名称遵循 IANA 系统,而非 Windows 系统。入站转换使用 Unicode CLDR 定义的扩展 Windows-Olson zid 映射进行。

异常和错误处理

当执行 Cypher 或使用驱动程序执行其他操作时,可能会出现某些异常和错误情况。服务器生成的异常都与一个状态码关联,该状态码描述了问题的性质,并附带一条提供更多详细信息的消息。

分类如下表所示。

表 2. 服务器状态码分类
分类 描述

客户端错误

客户端应用程序导致了错误。应用程序应修改并重试操作。

数据库错误

服务器导致了错误。重试操作通常不会成功。

瞬态错误

发生了临时错误。应用程序应重试操作。

服务不可用

当驱动程序在重试后仍无法与服务器建立通信时,将发出服务不可用异常信号。

遇到这种情况通常表示存在基本的网络或数据库问题。

虽然驱动程序可以采取某些缓解措施来避免此问题,但总会有无法避免的情况。因此,强烈建议确保客户端应用程序包含在客户端无法与服务器通信时可以遵循的代码路径。

瞬态错误

瞬态错误是那些由服务器生成并标记为可以安全重试而无需更改原始请求的错误。此类错误的示例包括死锁和内存问题。

使用事务函数时,当发生瞬态故障时,驱动程序通常能够自动重试。

异常属性 CanBeRetried 提供了关于后续尝试是否可能成功的见解。

© . All rights reserved.