事务中的 CALL 子查询
CALL
子查询可以执行在单独的内部事务中,生成中间提交。这在执行大型写入操作(如批量更新、导入和删除)时非常有用。
要在单独的事务中执行 CALL
子查询,请在子查询后添加修饰符 IN TRANSACTIONS
。将打开一个外部事务以报告内部事务的累积统计信息(创建和删除的节点、关系等),并且它将根据这些内部事务的结果成功或失败。默认情况下,内部事务将一起分组 1000 行的批次。取消外部事务也将取消内部事务。
CALL { … } IN TRANSACTIONS 仅允许在隐式事务中使用。如果您使用的是Neo4j 浏览器,则必须在使用 CALL { … } IN TRANSACTIONS 的任何查询前添加 :auto 。 |
此页面上的示例使用变量作用域子句(在 Neo4j 5.23 中引入)将变量导入 CALL 子查询。如果您使用的是旧版本的 Neo4j,请改用导入 WITH 子句。 |
语法
CALL {
subQuery
} IN [[concurrency] CONCURRENT] TRANSACTIONS
[OF batchSize ROW[S]]
[REPORT STATUS AS statusVar]
[ON ERROR {CONTINUE | BREAK | FAIL}];
加载 CSV 数据
此示例使用 CSV 文件和 LOAD CSV
子句将数据导入数据库。它使用 CALL { … } IN TRANSACTIONS
在单独的事务中创建节点
1,Bill,26
2,Max,27
3,Anna,22
4,Gladys,29
5,Summer,24
LOAD CSV FROM 'file:///friends.csv' AS line
CALL (line) {
CREATE (:Person {name: line[1], age: toInteger(line[2])})
} IN TRANSACTIONS
|
行数:0 |
由于此示例中的 CSV 文件大小很小,因此只启动并提交了一个单独的事务。
删除大量数据
使用 CALL { … } IN TRANSACTIONS
是删除大量数据的推荐方法。
MATCH (n)
CALL (n) {
DETACH DELETE n
} IN TRANSACTIONS
|
行数:0 |
CALL { … } IN TRANSACTIONS
子查询不应修改。
任何必要的过滤都可以在子查询之前完成。
MATCH (n:Label) WHERE n.prop > 100
CALL (n) {
DETACH DELETE n
} IN TRANSACTIONS
|
行数:0 |
批处理是在馈送到 |
批处理
可以根据在提交当前事务并启动新事务之前要处理多少输入行来指定每个单独事务中要执行的工作量。输入行数由修饰符 OF n ROWS
(或 OF n ROW
)设置。如果省略,则默认批处理大小为 1000
行。行数可以使用任何计算结果为正整数且不引用节点或关系的表达式来表示。
此示例加载一个 CSV 文件,每个 2
个输入行一个事务
1,Bill,26
2,Max,27
3,Anna,22
4,Gladys,29
5,Summer,24
LOAD CSV FROM 'file:///friends.csv' AS line
CALL (line) {
CREATE (:Person {name: line[1], age: toInteger(line[2])})
} IN TRANSACTIONS OF 2 ROWS
|
行数:0 |
查询现在启动并提交了三个单独的事务
-
子查询的前两次执行(对于来自
LOAD CSV
的前两行输入)发生在第一个事务中。 -
然后在继续之前提交第一个事务。
-
子查询的接下来的两次执行(对于接下来的两行输入)发生在第二个事务中。
-
提交第二个事务。
-
子查询的最后一次执行(对于最后一行输入)发生在第三个事务中。
-
提交第三个事务。
您还可以使用 CALL { … } IN TRANSACTIONS OF n ROWS
按批次删除所有数据,以避免巨大的垃圾回收或 OutOfMemory
异常。例如
MATCH (n)
CALL (n) {
DETACH DELETE n
} IN TRANSACTIONS OF 2 ROWS
|
行数:0 |
在一定范围内,使用更大的批量大小会提高性能。给定此处使用的小数据集, |
复合数据库
从 Neo4j 5.18 开始,CALL { … } IN TRANSACTIONS
可以与复合数据库一起使用。
即使复合数据库允许在单个查询中访问多个图,但单个事务只能修改一个图。CALL { … } IN TRANSACTIONS
提供了一种构建修改多个图的查询的方法。
虽然前面的示例通常对复合数据库有效,但在子查询中使用复合数据库时,还有一些额外的因素需要考虑。以下示例展示了如何在复合数据库上使用 CALL { … } IN TRANSACTIONS
。
1,Bill,26
2,Max,27
3,Anna,22
4,Gladys,29
5,Summer,24
Person
节点,从 friends.csv 中提取数据。UNWIND graph.names() AS graphName
LOAD CSV FROM 'file:///friends.csv' AS line
CALL (*) {
USE graph.byName( graphName )
CREATE (:Person {name: line[1], age: toInteger(line[2])})
} IN TRANSACTIONS
UNWIND graph.names() AS graphName
CALL {
USE graph.byName( graphName )
MATCH (n)
RETURN elementId(n) AS id
}
CALL {
USE graph.byName( graphName )
WITH id
MATCH (n)
WHERE elementId(n) = id
DETACH DELETE n
} IN TRANSACTIONS
由于批处理是在馈送到 CALL { … } IN TRANSACTIONS 的输入行上执行的,因此必须从子查询外部提供数据才能使批处理生效。这就是为什么节点在先于实际删除数据的子查询中匹配的原因。如果 MATCH 子句位于第二个子查询内,则数据删除将作为一个单一事务运行。 |
目前存在一个已知问题。当在 |
复合数据库中的批量大小
由于针对不同图的 CALL { … } IN TRANSACTIONS
子查询不能交错,因此,如果 USE
子句计算出的目标与当前目标不同,则当前批次将被提交,并创建下一个批次。
使用 IN TRANSACTIONS OF … ROWS
声明的批量大小表示批量大小的上限,但实际的批量大小取决于有多少输入行按顺序针对一个数据库。每次目标数据库更改时,批次都会被提交。
IN TRANSACTIONS OF ROWS
在复合数据库上的行为下一个示例假设复合数据库 composite
存在两个组成部分 remoteGraph1
和 remoteGraph2
。
虽然声明的批量大小为 3,但只有前 2 行作用于 composite.remoteGraph1
,因此第一个事务的批量大小为 2。接下来是 composite.remoteGraph2
上的 3 行,composite.remoteGraph2
上的 1 行,最后是 composite.remoteGraph1
上的 2 行。
WITH ['composite.remoteGraph1', 'composite.remoteGraph2'] AS graphs
UNWIND [0, 0, 1, 1, 1, 1, 0, 0] AS i
WITH graphs[i] AS g
CALL (g) {
USE graph.byName( g )
CREATE ()
} IN TRANSACTIONS OF 3 ROWS
错误行为
用户可以选择三个不同的选项标志之一来控制在 CALL { … } IN TRANSACTIONS
的任何内部事务发生错误时的情况下的行为。
-
ON ERROR CONTINUE
忽略可恢复错误并继续执行后续的内部事务。外部事务成功。它将导致失败的内部查询的预期变量对于该特定事务绑定为 null。 -
ON ERROR BREAK
忽略可恢复错误并停止执行后续的内部事务。外部事务成功。它将导致失败的内部查询的预期变量对于所有后续事务(包括失败的事务)绑定为 null。 -
ON ERROR FAIL
确认可恢复错误并停止执行后续的内部事务。外部事务失败。如果未显式指定任何标志,则这是默认行为。
发生错误时,任何先前已提交的内部事务都将保持提交状态,并且不会回滚。任何失败的内部事务都将回滚。 |
在以下示例中,第二个内部事务中的最后一个子查询执行由于除以零而失败。
UNWIND [4, 2, 1, 0] AS i
CALL (i) {
CREATE (:Person {num: 100/i})
} IN TRANSACTIONS OF 2 ROWS
RETURN i
/ by zero (Transactions committed: 1)
当故障发生时,第一个事务已经提交,因此数据库包含两个示例节点。
MATCH (e:Person)
RETURN e.num
e.num |
---|
|
|
行数:2 |
在以下示例中,在失败的内部事务之后使用 ON ERROR CONTINUE
来执行剩余的内部事务,并且不使外部事务失败。
UNWIND [1, 0, 2, 4] AS i
CALL (i) {
CREATE (n:Person {num: 100/i}) // Note, fails when i = 0
RETURN n
} IN TRANSACTIONS
OF 1 ROW
ON ERROR CONTINUE
RETURN n.num;
n.num |
---|
|
|
|
|
行数:4 |
请注意,在 2 行的事务中进行批处理时结果的差异。
UNWIND [1, 0, 2, 4] AS i
CALL (i) {
CREATE (n:Person {num: 100/i}) // Note, fails when i = 0
RETURN n
} IN TRANSACTIONS
OF 2 ROWS
ON ERROR CONTINUE
RETURN n.num;
n.num |
---|
|
|
|
|
行数:4 |
发生这种情况是因为创建了一个包含前两个 i
元素(1 和 0)的内部事务,并且它在 0 处失败。这会导致它回滚,并且返回变量将为这两个元素填充 null。
在以下示例中,在失败的内部事务之后使用 ON ERROR BREAK
来不执行剩余的内部事务,并且不使外部事务失败。
UNWIND [1, 0, 2, 4] AS i
CALL (i) {
CREATE (n:Person {num: 100/i}) // Note, fails when i = 0
RETURN n
} IN TRANSACTIONS
OF 1 ROW
ON ERROR BREAK
RETURN n.num;
n.num |
---|
|
|
|
|
行数:4 |
请注意,在 2 行的事务中进行批处理时结果的差异。
UNWIND [1, 0, 2, 4] AS i
CALL (i) {
CREATE (n:Person {num: 100/i}) // Note, fails when i = 0
RETURN n
} IN TRANSACTIONS
OF 2 ROWS
ON ERROR BREAK
RETURN n.num;
n.num |
---|
|
|
|
|
行数:4 |
在以下示例中,在失败的内部事务之后使用 ON ERROR FAIL
来不执行剩余的内部事务,并使外部事务失败。
UNWIND [1, 0, 2, 4] AS i
CALL (i) {
CREATE (n:Person {num: 100/i}) // Note, fails when i = 0
RETURN n
} IN TRANSACTIONS
OF 1 ROW
ON ERROR FAIL
RETURN n.num;
/ by zero (Transactions committed: 1)
状态报告
用户还可以通过使用 REPORT STATUS AS var
来报告内部事务的执行状态。此标志不允许用于 ON ERROR FAIL
。有关更多信息,请参阅错误行为。
每次内部查询的执行完成(成功或失败)后,都会创建一个状态值,用于记录有关执行和执行它的事务的信息。
-
如果内部执行生成一个或多个行作为输出,则在选定的变量名称下,将向每一行添加对该状态值的绑定。
-
如果内部执行失败,则会生成一行,其中包含在选定的变量下对该状态值的绑定,以及所有应由内部查询返回的变量的 null 绑定(如果有)。
状态值是一个具有以下字段的映射值。
-
started
:当内部事务启动时为true
,否则为false
。 -
committed
,当内部事务更改成功提交时为true
,否则为false
。 -
transactionId
:内部事务 ID,如果事务未启动则为null
。 -
errorMessage
,内部事务错误消息,如果未发生错误则为null
。
使用 ON ERROR CONTINUE
报告状态的示例
UNWIND [1, 0, 2, 4] AS i
CALL (i) {
CREATE (n:Person {num: 100/i}) // Note, fails when i = 0
RETURN n
} IN TRANSACTIONS
OF 1 ROW
ON ERROR CONTINUE
REPORT STATUS AS s
RETURN n.num, s;
n.num | s |
---|---|
|
|
|
|
|
|
|
|
行数:4 |
使用 ON ERROR BREAK
报告状态的示例
UNWIND [1, 0, 2, 4] AS i
CALL (i) {
CREATE (n:Person {num: 100/i}) // Note, fails when i = 0
RETURN n
} IN TRANSACTIONS
OF 1 ROW
ON ERROR BREAK
REPORT STATUS AS s
RETURN n.num, s.started, s.committed, s.errorMessage;
n.num | s.started | s.committed | s.errorMessage |
---|---|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
行数:4 |
不允许使用 ON ERROR FAIL
报告状态。
UNWIND [1, 0, 2, 4] AS i
CALL (i) {
CREATE (n:Person {num: 100/i}) // Note, fails when i = 0
RETURN n
} IN TRANSACTIONS
OF 1 ROW
ON ERROR FAIL
REPORT STATUS AS s
RETURN n.num, s.errorMessage;
REPORT STATUS can only be used when specifying ON ERROR CONTINUE or ON ERROR BREAK
并发事务
默认情况下,CALL { … } IN TRANSACTIONS
是单线程的;一个 CPU 内核用于顺序执行批次。
但是,CALL
子查询还可以通过附加 IN [n] CONCURRENT TRANSACTIONS
并行执行批次,其中 n
是一个并发值,用于设置可以并行执行的事务的最大数量。这允许 CALL
子查询同时利用多个 CPU 内核,这可以显着减少执行大型外部事务所需的时间。
并发值是可选的。如果未指定,则将选择基于可用 CPU 内核数量的默认值。如果指定了负数,则并发数将是可用 CPU 内核数减去该负数的绝对值。 |
CALL { … } IN CONCURRENT TRANSACTIONS
特别适用于导入没有依赖项的数据。此示例在 3 个并发事务中创建 Person
节点,这些节点来自分配给 CSV 文件中每个人员行的唯一 tmdbId
值(共 444 个)。
LOAD CSV WITH HEADERS FROM 'https://data.neo4j.com/importing-cypher/persons.csv' AS row
CALL (row) {
CREATE (p:Person {tmdbId: row.person_tmdbId})
SET p.name = row.name, p.born = row.born
} IN 3 CONCURRENT TRANSACTIONS OF 10 ROWS
RETURN count(*) AS personNodes
personNodes |
---|
|
行数:1 |
并发和非确定性结果
CALL { … } IN TRANSACTIONS
默认使用有序语义,其中批次按顺序逐行提交。例如,在 CALL { <I> } IN TRANSACTIONS
中,在 <I1>
执行中完成的任何写入都必须被 <I2>
等观察到。
相反,CALL { … } IN CONCURRENT TRANSACTIONS
使用并发语义,其中特定批次提交的行数和提交批次的顺序都是未定义的。也就是说,在 CALL { <I> } IN CONCURRENT TRANSACTIONS
中,在 <I1>
执行中提交的写入可能会或可能不会被 <I2>
等观察到。
因此,在并发事务中执行的 CALL
子查询的结果可能不是确定性的。为了保证确定性结果,请确保已提交批次的结果彼此之间不依赖。
死锁
当发生写入事务时,Neo4j 会获取锁以在更新时保持数据一致性。例如,在创建或删除关系时,会对特定关系及其连接的节点都获取写锁。
当两个事务相互阻塞时,就会发生死锁,因为它们试图同时修改被另一个事务锁定的节点或关系(有关 Neo4j 中锁和死锁的更多信息,请参阅操作手册 → 并发数据访问)。
如果两个或多个批次的事务尝试以导致它们之间出现循环依赖关系的顺序获取相同的锁,则在使用CALL { … } IN CONCURRENT TRANSACTIONS
时可能会发生死锁。如果发生这种情况,受影响的事务将始终回滚,并抛出错误,除非查询后附加了ON ERROR CONTINUE
或ON ERROR BREAK
。
以下查询尝试创建由RELEASED_IN
关系连接的Movie
和Year
节点。请注意,CSV 文件中只有三个不同的年份,这意味着应该只创建三个Year
节点。
LOAD CSV WITH HEADERS FROM 'https://data.neo4j.com/importing-cypher/movies.csv' AS row
CALL (row) {
MERGE (m:Movie {movieId: row.movieId})
MERGE (y:Year {year: row.year})
MERGE (m)-[r:RELEASED_IN]->(y)
} IN 2 CONCURRENT TRANSACTIONS OF 10 ROWS
死锁发生是因为两个事务同时尝试锁定并合并相同的Year
。
ForsetiClient[transactionId=64, clientId=12] can't acquire ExclusiveLock{owner=ForsetiClient[transactionId=63, clientId=9]} on NODE_RELATIONSHIP_GROUP_DELETE(98) because holders of that lock are waiting for ForsetiClient[transactionId=64, clientId=12].
Wait list:ExclusiveLock[
Client[63] waits for [ForsetiClient[transactionId=64, clientId=12]]]
以下查询使用ON ERROR CONTINUE
来绕过任何死锁并继续执行后续的内部事务。它返回失败事务的transactionID
、commitStatus
和errorMessage
。
ON ERROR CONTINUE
忽略死锁并完成外部事务的查询LOAD CSV WITH HEADERS FROM 'https://data.neo4j.com/importing-cypher/movies.csv' AS row
CALL (row) {
MERGE (m:Movie {movieId: row.movieId})
MERGE (y:Year {year: row.year})
MERGE (m)-[r:RELEASED_IN]->(y)
} IN 2 CONCURRENT TRANSACTIONS OF 10 ROWS ON ERROR CONTINUE REPORT STATUS as status
WITH status
WHERE status.errorMessage IS NOT NULL
RETURN status.transactionId AS transaction, status.committed AS commitStatus, status.errorMessage AS errorMessage
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| transaction | commitStatus | errorMessage |
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| "neo4j-transaction-169" | FALSE | "ForsetiClient[transactionId=169, clientId=11] can't acquire ExclusiveLock{owner=ForsetiClient[transactionId=168, clientId=9]} on NODE_RELATIONSHIP_GROUP_DELETE(46) because holders of that lock are waiting for ForsetiClient[transactionId=169, clientId=11]. |
| | \ Wait list:ExclusiveLock[ |
| | \ Client[168] waits for [ForsetiClient[transactionId=169, clientId=11]]]" |
| "neo4j-transaction-169" | FALSE | "ForsetiClient[transactionId=169, clientId=11] can't acquire ExclusiveLock{owner=ForsetiClient[transactionId=168, clientId=9]} on NODE_RELATIONSHIP_GROUP_DELETE(46) because holders of that lock are waiting for ForsetiClient[transactionId=169, clientId=11]. |
| | \ Wait list:ExclusiveLock[ |
| | \ Client[168] waits for [ForsetiClient[transactionId=169, clientId=11]]]" |
| "neo4j-transaction-169" | FALSE | "ForsetiClient[transactionId=169, clientId=11] can't acquire ExclusiveLock{owner=ForsetiClient[transactionId=168, clientId=9]} on NODE_RELATIONSHIP_GROUP_DELETE(46) because holders of that lock are waiting for ForsetiClient[transactionId=169, clientId=11]. |
| | \ Wait list:ExclusiveLock[ |
| | \ Client[168] waits for [ForsetiClient[transactionId=169, clientId=11]]]" |
| "neo4j-transaction-169" | FALSE | "ForsetiClient[transactionId=169, clientId=11] can't acquire ExclusiveLock{owner=ForsetiClient[transactionId=168, clientId=9]} on NODE_RELATIONSHIP_GROUP_DELETE(46) because holders of that lock are waiting for ForsetiClient[transactionId=169, clientId=11]. |
| | \ Wait list:ExclusiveLock[ |
| | \ Client[168] waits for [ForsetiClient[transactionId=169, clientId=11]]]" |
| "neo4j-transaction-169" | FALSE | "ForsetiClient[transactionId=169, clientId=11] can't acquire ExclusiveLock{owner=ForsetiClient[transactionId=168, clientId=9]} on NODE_RELATIONSHIP_GROUP_DELETE(46) because holders of that lock are waiting for ForsetiClient[transactionId=169, clientId=11]. |
| | \ Wait list:ExclusiveLock[ |
| | \ Client[168] waits for [ForsetiClient[transactionId=169, clientId=11]]]" |
| "neo4j-transaction-169" | FALSE | "ForsetiClient[transactionId=169, clientId=11] can't acquire ExclusiveLock{owner=ForsetiClient[transactionId=168, clientId=9]} on NODE_RELATIONSHIP_GROUP_DELETE(46) because holders of that lock are waiting for ForsetiClient[transactionId=169, clientId=11]. |
| | \ Wait list:ExclusiveLock[ |
| | \ Client[168] waits for [ForsetiClient[transactionId=169, clientId=11]]]" |
| "neo4j-transaction-169" | FALSE | "ForsetiClient[transactionId=169, clientId=11] can't acquire ExclusiveLock{owner=ForsetiClient[transactionId=168, clientId=9]} on NODE_RELATIONSHIP_GROUP_DELETE(46) because holders of that lock are waiting for ForsetiClient[transactionId=169, clientId=11]. |
| | \ Wait list:ExclusiveLock[ |
| | \ Client[168] waits for [ForsetiClient[transactionId=169, clientId=11]]]" |
| "neo4j-transaction-169" | FALSE | "ForsetiClient[transactionId=169, clientId=11] can't acquire ExclusiveLock{owner=ForsetiClient[transactionId=168, clientId=9]} on NODE_RELATIONSHIP_GROUP_DELETE(46) because holders of that lock are waiting for ForsetiClient[transactionId=169, clientId=11]. |
| | \ Wait list:ExclusiveLock[ |
| | \ Client[168] waits for [ForsetiClient[transactionId=169, clientId=11]]]" |
| "neo4j-transaction-169" | FALSE | "ForsetiClient[transactionId=169, clientId=11] can't acquire ExclusiveLock{owner=ForsetiClient[transactionId=168, clientId=9]} on NODE_RELATIONSHIP_GROUP_DELETE(46) because holders of that lock are waiting for ForsetiClient[transactionId=169, clientId=11]. |
| | \ Wait list:ExclusiveLock[ |
| | \ Client[168] waits for [ForsetiClient[transactionId=169, clientId=11]]]" |
| "neo4j-transaction-169" | FALSE | "ForsetiClient[transactionId=169, clientId=11] can't acquire ExclusiveLock{owner=ForsetiClient[transactionId=168, clientId=9]} on NODE_RELATIONSHIP_GROUP_DELETE(46) because holders of that lock are waiting for ForsetiClient[transactionId=169, clientId=11]. |
| | \ Wait list:ExclusiveLock[ |
| | \ Client[168] waits for [ForsetiClient[transactionId=169, clientId=11]]]" |
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
点击查看使用 Cypher 重试失败事务的示例
虽然可以使用驱动程序更有效地重试失败的事务,但以下是如何在同一个 Cypher®查询中重试失败事务的示例
LOAD CSV WITH HEADERS FROM 'https://data.neo4j.com/importing-cypher/movies.csv' AS row
CALL (row) {
MERGE (m:Movie {movieId: row.movieId})
MERGE (y:Year {year: row.year})
MERGE (m)-[r:RELEASED_IN]->(y)
} IN 2 CONCURRENT TRANSACTIONS OF 10 ROWS ON ERROR CONTINUE REPORT STATUS as status
WITH *
WHERE status.committed = false
CALL (row) {
MERGE (m:Movie {movieId: row.movieId})
MERGE (y:Year {year: row.year})
MERGE (m)-[r:RELEASED_IN]->(y)
} IN 2 CONCURRENT TRANSACTIONS OF 10 ROWS ON ERROR FAIL