运行您自己的事务
当使用 executableQuery()
查询数据库时,驱动程序会自动创建一个 事务。事务是一个工作单元,它要么整体提交,要么在失败时回滚。您可以在一个查询中包含多个 Cypher 语句,例如在使用 MATCH
和 CREATE
顺序更新数据库时,但您不能有多个查询并在它们之间交错一些客户端逻辑。
对于这些更高级的用例,驱动程序提供了函数来完全控制事务生命周期。这些被称为托管事务,您可以将其视为解开 executableQuery()
流程并能够在更多地方指定其所需行为的方式。
创建会话
在运行事务之前,您需要获取一个会话。会话充当驱动程序和服务器之间具体的查询通道,并确保因果一致性得到执行。
会话是使用方法 Driver.session()
创建的。使用可选参数更改会话的配置,例如目标数据库。有关更多配置参数,请参阅会话配置。
// import org.neo4j.driver.SessionConfig
try (var session = driver.session(SessionConfig.builder().withDatabase("neo4j").build())) {
// session usage
}
会话创建是一个轻量级操作,因此创建和销毁会话的成本不高。完成后,请务必关闭会话。
会话不是线程安全的:您可以在线程之间共享主 Driver
对象,但请确保每个线程创建自己的会话。
运行托管事务
一个事务可以包含任意数量的查询。由于 Neo4j 符合 ACID 标准,事务中的查询将要么全部执行,要么根本不执行:您不能让事务的一部分成功而另一部分失败。使用事务将协同工作以实现单个逻辑数据库操作的相关查询分组在一起。
托管事务是使用方法 Session.executeRead()
和 Session.executeWrite()
创建的,具体取决于您是想从数据库中检索数据还是修改数据。这两种方法都接受一个事务函数回调,该回调负责实际执行查询和处理结果。
Al
开头的人。// import java.util.Map
// import org.neo4j.driver.SessionConfig
try (var session = driver.session(SessionConfig.builder().withDatabase("neo4j").build())) { (1)
var people = session.executeRead(tx -> { (2)
var result = tx.run("""
MATCH (p:Person) WHERE p.name STARTS WITH $filter (3)
RETURN p.name AS name ORDER BY name
""", Map.of("filter", "Al"));
return result.list(); // return a list of Record objects (4)
});
people.forEach(person -> {
System.out.println(person);
});
// further tx.run() calls will execute within the same transaction
}
1 | 创建会话。单个会话可以包含多个查询。除非使用 try 构造创建为资源,否则请记住在完成后关闭它。 |
2 | .executeRead() (或 .executeWrite() )方法是事务的入口点。它接受一个回调给事务函数,该函数负责运行查询。 |
3 | 使用方法 tx.run() 执行查询。您可以提供一个查询参数映射作为第二个参数。每次查询运行都返回一个 Result 对象。 |
4 | 使用 Result 上的任何方法处理结果。.list() 方法将所有记录检索到列表中。 |
不要将参数直接硬编码或连接到查询中。出于性能和安全原因,请改用查询参数。
事务函数不应直接返回 Result
对象。相反,始终以某种方式处理结果。在事务函数中,return
语句会导致事务被提交,而如果抛出异常,事务将自动回滚。
.executeRead() 和 .executeWrite() 方法已取代 .readTransaction() 和 .writeTransaction() ,后者在 5.x 版本中已弃用,并将在 6.0 版本中移除。 |
package demo;
import java.util.Map;
import java.util.List;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import org.neo4j.driver.AuthTokens;
import org.neo4j.driver.GraphDatabase;
import org.neo4j.driver.QueryConfig;
import org.neo4j.driver.Record;
import org.neo4j.driver.RoutingControl;
import org.neo4j.driver.SessionConfig;
import org.neo4j.driver.TransactionContext;
import org.neo4j.driver.exceptions.NoSuchRecordException;
public class App {
// Create & employ 100 people to 10 different organizations
public static void main(String... args) {
final String dbUri = "{neo4j-database-uri}";
final String dbUser = "{neo4j-username}";
final String dbPassword = "{neo4j-password}";
try (var driver = GraphDatabase.driver(dbUri, AuthTokens.basic(dbUser, dbPassword))) {
try (var session = driver.session(SessionConfig.builder().withDatabase("neo4j").build())) {
for (int i=0; i<100; i++) {
String name = String.format("Thor%d", i);
try {
String orgId = session.executeWrite(tx -> employPersonTx(tx, name));
System.out.printf("User %s added to organization %s.%n", name, orgId);
} catch (Exception e) {
System.out.println(e.getMessage());
}
}
}
}
}
static String employPersonTx(TransactionContext tx, String name) {
final int employeeThreshold = 10;
// Create new Person node with given name, if not exists already
tx.run("MERGE (p:Person {name: $name})", Map.of("name", name));
// Obtain most recent organization ID and the number of people linked to it
var result = tx.run("""
MATCH (o:Organization)
RETURN o.id AS id, COUNT{(p:Person)-[r:WORKS_FOR]->(o)} AS employeesN
ORDER BY o.createdDate DESC
LIMIT 1
""");
Record org = null;
String orgId = null;
int employeesN = 0;
try {
org = result.single();
orgId = org.get("id").asString();
employeesN = org.get("employeesN").asInt();
} catch (NoSuchRecordException e) {
// The query is guaranteed to return <= 1 results, so if.single() throws, it means there's none.
// If no organization exists, create one and add Person to it
orgId = createOrganization(tx);
System.out.printf("No orgs available, created %s.%n", orgId);
}
// If org does not have too many employees, add this Person to it
if (employeesN < employeeThreshold) {
addPersonToOrganization(tx, name, orgId);
// If the above throws, the transaction will roll back
// -> not even Person is created!
// Otherwise, create a new Organization and link Person to it
} else {
orgId = createOrganization(tx);
System.out.printf("Latest org is full, created %s.%n", orgId);
addPersonToOrganization(tx, name, orgId);
// If any of the above throws, the transaction will roll back
// -> not even Person is created!
}
return orgId; // Organization ID to which the new Person ends up in
}
static String createOrganization(TransactionContext tx) {
var result = tx.run("""
CREATE (o:Organization {id: randomuuid(), createdDate: datetime()})
RETURN o.id AS id
""");
var org = result.single();
var orgId = org.get("id").asString();
return orgId;
}
static void addPersonToOrganization(TransactionContext tx, String personName, String orgId) {
tx.run("""
MATCH (o:Organization {id: $orgId})
MATCH (p:Person {name: $name})
MERGE (p)-[:WORKS_FOR]->(o)
""", Map.of("orgId", orgId, "name", personName)
);
}
}
如果事务因驱动程序认为是瞬态的原因而失败,它会自动重试运行事务函数(延迟呈指数级增长)。因此,事务函数必须是幂等的(即,它们在多次运行时应产生相同的效果),因为您无法事先知道它们将执行多少次。实际上,这意味着您不应编辑或依赖全局变量,例如。请注意,虽然事务函数可能会执行多次,但其中的查询将始终只运行一次。
一个会话可以链接多个事务,但在任何给定时间,一个会话中只能有一个事务处于活动状态。要维护多个并发事务,请使用多个并发会话。
传递给
|
运行显式事务
您可以通过使用方法 Session.beginTransaction()
手动开始一个事务,从而实现对事务的完全控制,该方法返回一个 Transaction
对象。然后,您可以使用方法 Transaction.run()
在显式事务中运行查询。
try (var session = driver.session(SessionConfig.builder().withDatabase("neo4j").build())) {
try (Transaction tx = session.beginTransaction()) {
// use tx.run() to run queries
// tx.commit() to commit the transaction
// tx.rollback() to rollback the transaction
}
}
显式事务可以使用 Transaction.commit()
提交,或使用 Transaction.rollback()
回滚。如果没有采取显式操作,驱动程序将在事务生命周期结束时自动回滚事务。
显式事务最适用于需要将 Cypher 执行分布到同一事务的多个函数中的应用程序,或者需要在一个事务中运行多个查询但不需要托管事务提供的自动重试功能的应用程序。
package demo;
import java.util.Map;
import java.util.List;
import java.util.Arrays;
import org.neo4j.driver.AuthTokens;
import org.neo4j.driver.Driver;
import org.neo4j.driver.GraphDatabase;
import org.neo4j.driver.QueryConfig;
import org.neo4j.driver.Record;
import org.neo4j.driver.SessionConfig;
import org.neo4j.driver.Transaction;
public class App {
public static void main(String... args) {
final String dbUri = "{neo4j-database-uri}";
final String dbUser = "{neo4j-username}";
final String dbPassword = "{neo4j-password}";
try (var driver = GraphDatabase.driver(dbUri, AuthTokens.basic(dbUser, dbPassword))) {
driver.verifyConnectivity();
String customerId = createCustomer(driver);
int otherBankId = 42;
transferToOtherBank(driver, customerId, otherBankId, 999);
}
}
static String createCustomer(Driver driver) {
var result = driver.executableQuery("""
MERGE (c:Customer {id: randomUUID(), balance: 1000})
RETURN c.id AS id
""")
.withConfig(QueryConfig.builder().withDatabase("neo4j").build())
.execute();
return result.records().get(0).get("id").asString();
}
static void transferToOtherBank(Driver driver, String customerId, int otherBankId, float amount) {
try (var session = driver.session(SessionConfig.builder().withDatabase("neo4j").build())) {
try (var tx = session.beginTransaction()) {
if (! customerBalanceCheck(tx, customerId, amount)) {
System.out.printf("Customer %s doesn't have enough funds.%n", customerId);
return; // give up
}
otherBankTransferApi(customerId, otherBankId, amount);
// Now the money has been transferred => can't rollback anymore
// (cannot rollback external services interactions)
try {
decreaseCustomerBalance(tx, customerId, amount);
tx.commit();
System.out.printf("Transferred %f to %s.%n", amount, customerId);
} catch (Exception e) {
requestInspection(customerId, otherBankId, amount, e);
throw new RuntimeException(e.getMessage());
}
}
}
}
static boolean customerBalanceCheck(Transaction tx, String customerId, float amount) {
var result = tx.run("""
MATCH (c:Customer {id: $id})
RETURN c.balance >= $amount AS sufficient
""", Map.of("id", customerId, "amount", amount));
var record = result.single();
return record.get("sufficient").asBoolean();
}
static void otherBankTransferApi(String customerId, int otherBankId, float amount) {
// make some API call to other bank
}
static void decreaseCustomerBalance(Transaction tx, String customerId, float amount) {
tx.run("""
MATCH (c:Customer {id: $id})
SET c.balance = c.balance - $amount
""", Map.of("id", customerId, "amount", amount));
}
static void requestInspection(String customerId, int otherBankId, float amount, Exception e) {
// manual cleanup required; log this or similar
System.out.printf("WARNING: transaction rolled back due to exception: %s.%n", e.getMessage());
System.out.printf("customerId: %s, otherBankId: %d, amount: %f.%n", customerId, otherBankId, amount);
}
}
处理查询结果
驱动程序查询的输出是一个 Result
对象,它将 Cypher 结果封装在一个丰富的数据结构中,需要在客户端进行一些解析。需要注意两个主要点:
-
结果记录不会立即完全从服务器获取并返回。相反,结果以惰性流的形式出现。特别是,当驱动程序从服务器接收到一些记录时,它们最初会被缓冲在后台队列中。记录会保留在缓冲区中,直到应用程序消费它们,此时它们会从缓冲区中移除。当没有更多记录可用时,结果就耗尽了。
-
结果充当游标。这意味着无法从流中检索先前的记录,除非您将其保存在辅助数据结构中。
下面的动画展示了单个查询的路径:它展示了驱动程序如何处理结果记录以及应用程序应如何处理结果。
处理结果最简单的方法是调用其上的 .list()
,它会生成一个 Record
对象列表。此外,Result
对象实现了许多处理记录的方法。下面列出了最常用的方法。
方法 | 描述 |
---|---|
|
将结果的剩余部分作为列表返回。 |
|
返回下一个且唯一剩余的记录。调用此方法总是会耗尽结果。如果可用记录多于(或少于)一个,则会抛出 |
|
返回结果中的下一条记录。如果没有更多记录可用,则抛出 |
|
结果迭代器是否有下一条记录可移动。 |
|
从结果中返回下一条记录,而不消费它。这会将记录留在缓冲区中以供进一步处理。 |
|
返回查询结果摘要。它会耗尽结果,因此只应在数据处理结束后调用。 |
有关 Result
方法的完整列表,请参阅API 文档 → Result。
有关更多信息,请参阅数据类型与 Cypher 类型映射。 |
会话配置
数据库选择
建议始终通过 .withDatabase("<dbName>")
方法显式指定数据库,即使在单数据库实例上也是如此。这使得驱动程序能够更高效地工作,因为它节省了一次到服务器解析主数据库的网络往返。如果没有给定数据库,则使用 Neo4j 实例设置中设置的默认数据库。
// import org.neo4j.driver.SessionConfig;
var session = driver.session(SessionConfig.builder()
.withDatabase("neo4j").build());
通过配置方法指定数据库优于 USE Cypher 子句。如果服务器在集群上运行,带有 USE 的查询需要启用服务器端路由。查询执行时间也可能更长,因为它们可能无法在第一次尝试时到达正确的集群成员,并且需要路由到包含所请求数据库的成员。 |
请求路由
在集群环境中,所有会话都以写入模式打开,将其路由到主节点。您可以通过调用方法 .withRouting(RoutingControl.READ)
来更改此设置。请注意,.executeRead()
和 .executeWrite()
会自动覆盖会话的默认访问模式。
// import org.neo4j.driver.SessionConfig;
// import org.neo4j.driver.AccessMode;
var session = driver.session(SessionConfig.builder()
.withDatabase("neo4j")
.withDefaultAccessMode(AccessMode.READ)
.build());
尽管在读取模式下执行写入查询很可能会导致运行时错误,但您不应依赖此进行访问控制。两种模式的区别在于,读取事务被路由到集群的任何节点,而写入事务被定向到主节点。换句话说,不能保证在读取模式下提交的写入查询会被拒绝。 类似的注意事项也适用于 |
以不同用户身份运行查询
您可以通过在会话创建时提供 AuthToken 作为第三个参数,以不同用户的身份执行查询。在会话级别切换用户比创建新的 Driver
对象更经济。查询随后将在给定用户的安全上下文(即主数据库、权限等)中运行。
会话范围的身份验证要求服务器版本 >= 5.8。
// import org.neo4j.driver.AuthTokens;
// import org.neo4j.driver.Session;
// import org.neo4j.driver.SessionConfig;
var authToken = AuthTokens.basic("somebodyElse", "theirPassword");
var session = driver.session(
Session.class,
SessionConfig.builder()
.withDatabase("neo4j")
.build(),
authToken
);
.withImpersonatedUser()
方法提供了类似的功能,并且在驱动程序/服务器版本 >= 4.4 中可用。区别在于您不需要知道用户的密码即可模拟他们,但创建 Driver
所用的用户需要具有适当的权限。
// import org.neo4j.driver.SessionConfig;
var session = driver.session(SessionConfig.builder()
.withDatabase("neo4j")
.withImpersonatedUser("somebodyElse")
.build());
事务配置
您可以通过向 .executeRead()
、.executeWrite()
和 .beginTransaction()
提供一个 TransactionConfig
对象作为(可选)第二个参数,从而对事务施加进一步的控制。使用它来指定
-
事务超时。运行时间过长的事务将被服务器终止。默认值在服务器端设置。最小值为一毫秒。
-
附加到事务的元数据映射。这些元数据会记录在服务器的
query.log
中,并在SHOW TRANSACTIONS
Cypher 命令的输出中可见。使用此功能标记事务。
// import java.time.Duration
// import org.neo4j.driver.SessionConfig
// import org.neo4j.driver.TransactionConfig
try (var session = driver.session(SessionConfig.builder().withDatabase("neo4j").build())) {
var people = session.executeRead(tx -> {
var result = tx.run("MATCH (p:Person) RETURN p");
return result.list(); // return a list of Record objects
}, TransactionConfig.builder()
.withTimeout(Duration.ofSeconds(5))
.withMetadata(Map.of("appName", "peopleTracker"))
.build()
);
people.forEach(person -> System.out.println(person));
}
关闭会话
每个连接池都有有限数量的会话,因此如果您打开会话而不关闭它们,您的应用程序可能会耗尽会话。因此,建议使用 try-with-resources
语句创建会话,这样当应用程序使用完会话后,它们会自动关闭。当会话关闭时,它会被返回到连接池以供以后重用。
如果您不使用 try
将会话作为资源打开,请记住在完成使用后调用 .close()
方法。
var session = driver.session(SessionConfig.builder().withDatabase("neo4j").build());
// session usage
session.close();
词汇表
- LTS
-
长期支持版本是指保证在数年内获得支持的版本。Neo4j 4.4 是 LTS 版本,Neo4j 5 也将有 LTS 版本。
- Aura
-
Aura 是 Neo4j 的全托管云服务。它提供免费和付费计划。
- Cypher
-
Cypher 是 Neo4j 的图查询语言,可让您从数据库中检索数据。它类似于 SQL,但专用于图。
- APOC
-
Awesome Procedures On Cypher (APOC) 是一个函数库(有很多),这些函数本身无法在 Cypher 中轻松表达。
- Bolt
-
Bolt 是 Neo4j 实例和驱动程序之间交互使用的协议。它默认监听端口 7687。
- ACID
-
原子性、一致性、隔离性、持久性 (ACID) 是保证数据库事务可靠处理的属性。符合 ACID 标准的 DBMS 确保数据库中的数据尽管发生故障也能保持准确和一致。
- 最终一致性
-
如果数据库提供所有集群成员在某个时间点将存储最新版本数据的保证,则该数据库是最终一致的。
- 因果一致性
-
如果读取和写入查询被集群的每个成员以相同的顺序看到,则数据库具有因果一致性。这比最终一致性更强。
- NULL
-
空标记不是类型,而是值缺失的占位符。欲了解更多信息,请参阅Cypher → 使用
null
。 - 事务
-
事务是一个工作单元,它要么整体提交,要么在失败时回滚。一个例子是银行转账:它涉及多个步骤,但它们必须全部成功或被撤销,以避免钱从一个账户中扣除但未添加到另一个账户的情况发生。
- 背压
-
背压是一种阻碍数据流动的力。它确保客户端不会被比其处理能力更快的数据淹没。
- 事务函数
-
事务函数是由
executeRead
或executeWrite
调用执行的回调。在服务器故障的情况下,驱动程序会自动重新执行该回调。 - Driver
-
一个
Driver
对象保存了建立与 Neo4j 数据库连接所需的详细信息。