Cypher 工作流

本节介绍如何创建工作单元并为该工作提供逻辑上下文。

概述

Neo4j 驱动程序公开了 Cypher 通道,可以通过该通道执行数据库工作(有关 Cypher 查询语言的更多信息,请参阅Cypher 手册)。

工作本身组织成**会话**、**事务**和**查询**,如下所示

sessions queries transactions
图 1. 会话、查询和事务

会话始终绑定到**单个事务上下文**,通常是单个数据库。

使用书签机制,会话还保证了事务排序的正确性,即使事务发生在多个集群成员之间也是如此。此效果称为**因果链**

会话

会话是**用于因果链事务序列的轻量级容器**(请参阅操作手册 → 因果一致性)。它们本质上提供了用于以书签形式存储事务排序信息的上下文。

当事务开始时,包含它的会话从驱动程序连接池中获取一个连接。在提交(或回滚)事务时,会话会再次释放该连接。这意味着只有当会话正在执行工作时,它才会占用连接资源。空闲时,不会使用此类资源。

由于会话保证了排序,因此会话一次只能承载**一个事务**。对于并行执行,应使用多个会话。在线程安全成为问题的语言中,**会话不应被视为线程安全**。

关闭会话会强制回滚任何打开的事务,并将其关联的连接相应地释放回池中。

会话绑定到单个事务上下文,在构造时指定。**Neo4j**在其自己的上下文中公开每个数据库,从而通过设计禁止跨数据库事务(或会话)。同样,**绑定到不同数据库的会话不能通过在它们之间传播书签来因果链接**。

各个语言驱动程序提供多个会话类,每个类都面向特定的编程风格。每个会话类都提供类似的功能集,但为客户端应用程序提供了一种基于应用程序结构和使用的框架(如果有)的选择。

会话类在会话 API中进行了描述。有关更多详细信息,请参阅API 文档

事务

事务是包含一个或多个**Cypher 查询**的**原子工作单元**。事务可能包含读或写工作,并且通常会路由到合适的服务器以执行,在那里它们将完整地执行。如果事务失败,则需要从头开始重试事务。这是**事务管理器**的责任。

Neo4j 驱动程序通过**事务函数**机制提供**事务管理**。此机制通过 Session 对象上的方法公开,这些方法接受一个函数对象,该函数对象可以多次针对不同的服务器播放,直到它成功或达到超时为止。对于大多数客户端应用程序,建议使用此方法。

一个方便的**简写替代方案**是**自动提交**事务机制。这为单查询事务提供了有限形式的事务管理,作为略微减少代码开销的权衡。这种形式的事务对于**快速脚本和不需要高可用性保证的环境**很有用。它也是运行Cypher 手册 → CALL {} IN TRANSACTIONS查询所需的交易形式,这些查询是唯一管理自身交易的 Cypher 查询类型。

还提供了一个更低级别的**非托管事务 API**,用于高级用例。当客户端应用了替代的事务管理层时,这很有用,在这种情况下,需要以自定义方式管理错误处理和重试。

要了解有关如何使用非托管事务的更多信息,请参阅API 文档

查询和结果

查询包括请求服务器执行 Cypher 语句,然后将结果响应回客户端。结果作为**记录流**传输,以及标题和页脚元数据,并且客户端应用程序可以增量地使用它。使用反应式功能,可以通过允许 Cypher 结果在中途暂停或取消来增强记录流的语义。

要执行 Cypher 查询,需要**查询文本**以及一组可选的**命名参数**。文本可以包含在运行时用相应的值替换的**参数占位符**。虽然可以运行非参数化的 Cypher 查询,但**良好的编程实践是在尽可能的情况下在 Cypher 查询中使用参数**。这允许在 Cypher 引擎内缓存查询,这有利于性能。参数值应遵循Cypher 值

结果摘要通常也可用。它包含与查询执行和结果内容相关的其他信息。对于EXPLAINPROFILE查询,这是返回查询计划的位置。有关这些查询的更多信息,请参阅Cypher 手册 → 分析查询

因果链和书签

在使用**因果集群**时,可以通过会话将事务链接起来,以确保**因果一致性**。这意味着对于任何两个事务,都保证第二个事务只有在第一个事务成功提交后才能开始。即使事务是在不同的物理集群成员上执行的,也适用。有关因果集群的更多信息,请参阅操作手册 → 集群

在内部,因果链接是通过在事务之间传递书签来执行的。每个书签记录特定数据库的事务历史中的一个或多个点,并且可用于告知集群成员以特定顺序执行工作单元。收到书签后,服务器将阻塞,直到它赶上相关事务时间点。

在开始新事务时,从客户端发送初始书签到服务器,并在成功完成时返回最终书签。请注意,这适用于读取和写入事务。

书签传播在会话内自动执行,不需要应用程序的任何显式信号或设置。要选择退出此机制,对于不相关的作业单元,应用程序可以使用多个会话。这避免了因果链的小延迟开销。

书签可以通过从一个会话中提取最后一个书签并将其传递到另一个会话的构造中来在会话之间传递。如果事务具有多个逻辑前驱,则还可以组合多个书签。请注意,只有在跨会话链接时,应用程序才需要直接处理书签。

driver passing bookmarks
图 2. 传递书签
示例 1. 传递书签
// Create a company node
private IResultSummary AddCompany(IQueryRunner tx, string name)
{
    return tx.Run("CREATE (a:Company {name: $name})", new { name }).Consume();
}

// Create a person node
private IResultSummary AddPerson(IQueryRunner tx, string name)
{
    return tx.Run("CREATE (a:Person {name: $name})", new { name }).Consume();
}

// Create an employment relationship to a pre-existing company node.
// This relies on the person first having been created.
private IResultSummary Employ(IQueryRunner tx, string personName, string companyName)
{
    return tx.Run(
            @"MATCH (person:Person {name: $personName}) 
                 MATCH (company:Company {name: $companyName}) 
                 CREATE (person)-[:WORKS_FOR]->(company)",
            new { personName, companyName })
        .Consume();
}

// Create a friendship between two people.
private IResultSummary MakeFriends(IQueryRunner tx, string name1, string name2)
{
    return tx.Run(
            @"MATCH (a:Person {name: $name1}) 
                 MATCH (b:Person {name: $name2})
                 MERGE (a)-[:KNOWS]->(b)",
            new { name1, name2 })
        .Consume();
}

// Match and display all friendships.
private int PrintFriendships(IQueryRunner tx)
{
    var result = tx.Run("MATCH (a)-[:KNOWS]->(b) RETURN a.name, b.name");

    var count = 0;
    foreach (var record in result)
    {
        count++;
        Console.WriteLine($"{record["a.name"]} knows {record["b.name"]}");
    }

    return count;
}

public void AddEmployAndMakeFriends()
{
    // To collect the session bookmarks
    var savedBookmarks = new List<Bookmarks>();

    // Create the first person and employment relationship.
    using (var session1 = Driver.Session(o => o.WithDefaultAccessMode(AccessMode.Write)))
    {
        session1.ExecuteWrite(tx => AddCompany(tx, "Wayne Enterprises"));
        session1.ExecuteWrite(tx => AddPerson(tx, "Alice"));
        session1.ExecuteWrite(tx => Employ(tx, "Alice", "Wayne Enterprises"));

        savedBookmarks.Add(session1.LastBookmarks);
    }

    // Create the second person and employment relationship.
    using (var session2 = Driver.Session(o => o.WithDefaultAccessMode(AccessMode.Write)))
    {
        session2.ExecuteWrite(tx => AddCompany(tx, "LexCorp"));
        session2.ExecuteWrite(tx => AddPerson(tx, "Bob"));
        session2.ExecuteWrite(tx => Employ(tx, "Bob", "LexCorp"));

        savedBookmarks.Add(session2.LastBookmarks);
    }

    // Create a friendship between the two people created above.
    using (var session3 = Driver.Session(
               o =>
                   o.WithDefaultAccessMode(AccessMode.Write).WithBookmarks(savedBookmarks.ToArray())))
    {
        session3.ExecuteWrite(tx => MakeFriends(tx, "Alice", "Bob"));

        session3.ExecuteRead(PrintFriendships);
    }
}

使用访问模式路由事务

事务可以在**读取**或**写入**模式下执行;这称为**访问模式**。在因果集群中,每个事务将根据模式路由到相应的服务器。当使用单个实例时,所有事务都将传递到该服务器。

通过识别读写操作来路由 Cypher 可以提高可用集群资源的利用率。由于读取服务器通常比写入服务器多,因此将读取流量定向到读取服务器而不是写入服务器是有益的。这样做有助于保持写入服务器可用于写入事务。

**访问模式通常由用于调用事务函数的方法指定。** 会话类提供了一种调用读取的方法和另一种调用写入的方法。

作为**自动提交**和**非托管事务**的回退,还可以**在会话级别**提供**默认访问模式**。这仅在无法以其他方式指定访问模式的情况下使用。**如果在该会话中使用事务函数,则默认访问模式将被覆盖**。

驱动程序不解析**Cypher**,因此无法自动确定事务是否打算执行读取或写入操作。因此,标记为读取的写入事务仍将发送到读取服务器,但在执行时会失败。
示例 2. 读写事务
public long AddPerson(string name)
{
    using var session = Driver.Session();
    session.ExecuteWrite(tx => CreatePersonNode(tx, name));
    return session.ExecuteRead(tx => MatchPersonNode(tx, name));
}

private static IResultSummary CreatePersonNode(IQueryRunner tx, string name)
{
    return tx.Run("CREATE (a:Person {name: $name})", new { name }).Consume();
}

private static long MatchPersonNode(IQueryRunner tx, string name)
{
    var result = tx.Run("MATCH (a:Person {name: $name}) RETURN id(a)", new { name });
    return result.Single()[0].As<long>();
}

数据库和执行上下文

**Neo4j** 提供了在同一 DBMS 中使用**多个数据库**的功能。

对于**社区版**,这仅限于**一个用户数据库**,加上system数据库。

从驱动程序 API 的角度来看,会话具有 DBMS 范围,并且可以在会话构造时选择会话的默认数据库。默认数据库用作不使用USE子句显式指定数据库的查询的目标。有关USE子句的详细信息,请参阅Cypher 手册→USE

在多数据库环境中,**服务器将一个数据库标记为默认数据库**。在没有将特定数据库命名为默认数据库的情况下创建会话时,将选择此数据库。在只有一个数据库的环境中,该数据库始终是默认数据库。

有关在同一 DBMS 中管理多个数据库的更多信息,请参阅Cypher 手册→Neo4j 数据库和图,其中全面分解了**Neo4j**的数据存储层次结构。

以下示例说明了如何使用数据库

var session = driver.session(SessionConfig.forDatabase( "foo" ))
// lists nodes from database foo
session.run("MATCH (n) RETURN n").list()

// lists nodes from database bar
session.run("USE bar MATCH (n) RETURN n").list()

// creates an index in foo
session.run("CREATE INDEX foo_idx FOR (n:Person) ON n.id").consume()

// creates an index in bar
session.run("USE bar CREATE INDEX bar_idx FOR (n:Person) ON n.id").consume()

// targets System database
session.run("SHOW USERS")

数据库选择

在会话创建期间,将数据库的名称传递给驱动程序。如果未指定名称,则使用默认数据库。数据库名称不能为null,也不能为空字符串。

**仅当驱动程序连接到 Neo4j 企业版时,才能选择数据库**。在 Neo4j 社区版中更改为除默认数据库以外的任何其他数据库都将导致运行时错误。

以下示例说明了 DBMS Cypher 手册→事务的概念,并展示了如何在单个驱动程序事务中发出对多个数据库的查询。它使用注释描述了每个操作如何影响数据库事务。

var session = driver.session(SessionConfig.forDatabase( "foo" ))
// a DBMS-level transaction is started
var transaction = session.begin()

// a transaction on database "foo" is started automatically with the first query targeting foo
transaction.run("MATCH (n) RETURN n").list()

// a transaction on database "bar" is started
transaction.run("USE bar MATCH (n) RETURN n").list()

// executes in the transaction on database "foo"
transaction.run("RETURN 1").consume()

// executes in the transaction on database "bar"
transaction.run("USE bar RETURN 1").consume()

// commits the DBMS-level transaction which commits the transactions on databases "foo" and
// "bar"
transaction.commit()

请注意,请求的数据库必须存在。

示例 3. 会话创建时的数据库选择
using (var session = _driver.Session(SessionConfigBuilder.ForDatabase("examples")))
{
    session.Run("CREATE (a:Greeting {message: 'Hello, Example-Database'}) RETURN a").Consume();
}

void SessionConfig(SessionConfigBuilder configBuilder)
{
    configBuilder.WithDatabase("examples")
        .WithDefaultAccessMode(AccessMode.Read)
        .Build();
}

using (var session = _driver.Session(SessionConfig))
{
    var result = session.Run("MATCH (a:Greeting) RETURN a.message as msg");
    var msg = result.Single()[0].As<string>();
    Console.WriteLine(msg);
}

类型映射

驱动程序在**应用程序语言类型**和**Cypher 类型**之间进行转换。

要传递参数和处理结果,了解 Cypher 如何使用类型以及 Cypher 类型如何在驱动程序中映射的基本知识非常重要。

下表显示了可用的数据类型。所有类型都可能在结果中找到,尽管并非所有类型都可以用作参数。

Cypher 类型 参数 结果

NULL*

LIST

MAP

BOOLEAN

INTEGER

FLOAT

STRING

ByteArray

DATE

ZONED TIME

LOCAL TIME

ZONED DATETIME

LOCAL DATETIME

DURATION

POINT

NODE**

RELATIONSHIP**

PATH**

* 空标记不是类型,而是值不存在的占位符。有关如何在 Cypher 中使用空的详细信息,请参阅Cypher 手册→使用null

**节点、关系和路径作为原始图实体的快照传递到结果中。虽然这些快照中包含原始实体 ID,但不会保留回底层服务器端实体的永久链接,这些实体可能会独立于客户端副本被删除或以其他方式更改。图结构不能用作参数,因为它取决于应用程序上下文此类参数是通过引用传递还是通过值传递,并且 Cypher 没有机制来表示这一点。等效的功能可以通过简单地传递 ID 进行按引用传递,或传递属性的提取映射进行按值传递来实现。**

Neo4j 驱动程序将 Cypher 类型映射到本机语言类型,反之亦然,如下表所示。自定义类型(语言或标准库中不可用的类型)以**粗体**突出显示。

表 1. 将 Neo4j 类型映射到 .NET 类型
Neo4j Cypher 类型 .NET 类型

NULL

null

LIST

IList<object>

MAP

IDictionary<string, object>

BOOLEAN

bool

INTEGER

long

FLOAT

double

STRING

string

ByteArray

byte[]

DATE

LocalDate

ZONED TIME

OffsetTime

LOCAL TIME

LocalTime

ZONED DATETIME*

ZonedDateTime

LOCAL DATETIME

LocalDateTime

DURATION

Duration

POINT

Point

NODE

INode

RELATIONSHIP

IRelationship

PATH

IPath

* 时区名称遵循IANA 系统,而不是Windows 系统。入站转换使用 Unicode CLDR 定义的扩展 Windows-Olson zid 映射执行。

异常和错误处理

在执行**Cypher**或使用驱动程序执行其他操作时,可能会出现某些异常和错误情况。**服务器生成的异常**都与**状态代码**相关联,该代码描述了问题的性质,并提供更详细的消息。

分类如下表所示。

表 2. 服务器状态代码分类
分类 描述

ClientError

客户端应用程序已导致错误。应用程序应修改并重试操作。

DatabaseError

服务器已导致错误。重试操作通常不会成功。

TransientError

发生了临时错误。应用程序应重试操作。

服务不可用

当驱动程序即使在重试后也无法再与服务器建立通信时,将发出**服务不可用**异常。

遇到此情况通常表示网络或数据库存在基本问题。

虽然驱动程序可以进行某些缓解措施以避免此问题,但始终存在无法避免的情况。因此,强烈建议确保客户端应用程序包含一个代码路径,以便在客户端无法再与服务器通信时遵循。

瞬态错误

瞬态错误是由服务器生成并标记为可以安全重试而无需更改原始请求的错误。此类错误的示例包括死锁和内存问题。

在使用事务函数时,驱动程序通常能够在发生瞬态故障时自动重试。

异常属性CanBeRetried提供有关进一步尝试是否可能成功的见解。