事务管理

本页面仅描述在使用 Neo4j Java API 时事务管理的一些特定方面,并提供了一些如何避免死锁、如何为特定数据库注册事务事件监听器以及如何在事务变更集之上执行基本操作的示例。

因此,强烈建议您在继续阅读本页面之前,先阅读《操作手册 → 数据库内部结构和事务行为》

概述

访问图、索引或模式的数据库操作在事务中执行,以确保 ACID 特性。事务是单线程的、受限的且独立的。可以在单个线程中启动多个事务,它们彼此独立。

使用事务的交互周期遵循以下步骤:

  1. 开始事务。

  2. 执行数据库操作。

  3. 提交或回滚事务。

完成每个事务至关重要,因为事务获取的锁或内存只会在完成后释放。有关锁和死锁的更多信息,请参阅《操作手册 → 锁和死锁》

在 Neo4j 中事务的惯用方式是使用 `try-with-resources` 语句,并将 `transaction` 声明为其中一个资源。然后启动事务并尝试执行图操作。`try` 块中的最后一个操作应根据业务逻辑提交或回滚事务。在这种情况下,`try-with-resources` 用作防止意外异常的保护措施,并作为额外的安全机制,确保无论语句块内部发生什么,事务都会被关闭。所有未提交的事务将在语句结束时的资源清理过程中回滚。对于已明确提交或回滚的事务,不需要进行资源清理,并且事务关闭是一个空操作。

事务中执行的所有修改都保留在内存中。这意味着非常大的更新必须分解为多个事务,以避免内存不足。

死锁处理示例

由 Neo4j 管理的锁之外的其他同步机制引起的死锁仍然可能发生。由于 Neo4j API 中的所有操作都是线程安全的,除非另有说明,因此不需要外部同步。需要同步的其他代码应以不在此同步块中执行任何 Neo4j 操作的方式进行同步。

以下是在过程、服务器扩展或使用嵌入式 Neo4j 时如何处理死锁的示例。

此代码片段的完整源代码可在DeadlockDocTest.java 中找到。

在代码中处理死锁时,您可能需要解决以下几个问题:

  • 只进行有限次数的重试,如果达到阈值则失败。

  • 在每次尝试之间暂停,以允许其他事务完成,然后再尝试。

  • 重试循环不仅对死锁有用,对其他类型的瞬时错误也很有用。

下面是一个展示如何实现此功能的示例。

示例 1. 使用重试循环处理死锁

此示例展示了如何使用重试循环来处理死锁

Throwable txEx = null;
int RETRIES = 5;
int BACKOFF = 3000;
for ( int i = 0; i < RETRIES; i++ )
{
    try ( Transaction tx = databaseService.beginTx() )
    {
        Object result = doStuff(tx);
        tx.commit();
        return result;
    }
    catch ( Throwable ex )
    {
        txEx = ex;

        // Add whatever exceptions to retry on here
        if ( !(ex instanceof DeadlockDetectedException) )
        {
            break;
        }
    }

    // Wait so that we don't immediately get into the same deadlock
    if ( i < RETRIES - 1 )
    {
        try
        {
            Thread.sleep( BACKOFF );
        }
        catch ( InterruptedException e )
        {
            throw new TransactionFailureException( "Interrupted", e );
        }
    }
}

if ( txEx instanceof TransactionFailureException )
{
    throw ((TransactionFailureException) txEx);
}
else if ( txEx instanceof Error )
{
    throw ((Error) txEx);
}
else
{
    throw ((RuntimeException) txEx);
}

事务事件

可以注册一个 `neo4j.org.graphdb.event.TransactionEventListener` 来接收 Neo4j 数据库事务事件。一旦它在一个 `org.neo4j.dbms.api.DatabaseManagementService` 实例上注册,它就会接收与其注册的数据库的事务事件。监听器会收到关于执行了任何写入操作并将被提交的事务的通知。如果 `Transaction#commit()` 未被调用,或者事务通过 `Transaction#rollback()` 回滚,则事务将被回滚,并且不会向监听器发送任何事件。

在事务提交之前,会调用监听器的 `beforeCommit` 方法,其中包含事务中所有修改的差异。此时事务仍在运行,因此仍然可以进行更改。该方法也可能抛出异常,从而阻止事务提交。如果事务回滚,将随后调用监听器的 `afterRollback` 方法。

监听器的执行顺序是未定义的——不保证一个监听器所做的更改会被其他监听器看到。

如果在所有已注册的监听器中成功执行了 `beforeCommit`,则事务将被提交,并且 `afterCommit` 方法将使用相同的事务数据被调用。此调用还包括从 `beforeCommit` 返回的对象。

在 `afterCommit` 中,事务已关闭,访问 `org.neo4j.graphdb.event.TransactionData` 之外的任何内容都需要打开新事务。`neo4j.org.graphdb.event.TransactionEventListener` 会收到通过 `org.neo4j.graphdb.event.TransactionData` 可访问的任何更改的事务通知。一些索引和模式更改不会触发这些事件。

以下示例展示了如何为特定数据库注册监听器,并在事务变更集之上执行基本操作。

此代码片段的完整源代码可在TransactionEventListenerExample.java 中找到。

示例 2. TransactionEventListener

注册事务事件监听器并检查变更集

public static void main( String[] args ) throws IOException
{
    FileUtils.deleteDirectory( HOME_DIRECTORY );
    var managementService = new DatabaseManagementServiceBuilder( HOME_DIRECTORY ).build();
    var database = managementService.database( DEFAULT_DATABASE_NAME );

    var countingListener = new CountingTransactionEventListener();
    managementService.registerTransactionEventListener( DEFAULT_DATABASE_NAME, countingListener );

    var connectionType = RelationshipType.withName( "CONNECTS" );
    try ( var transaction = database.beginTx() )
    {
        var startNode = transaction.createNode();
        var endNode = transaction.createNode();
        startNode.createRelationshipTo( endNode, connectionType );
        transaction.commit();
    }
}

private static class CountingTransactionEventListener implements TransactionEventListener<CreatedEntitiesCounter>
{
    @Override
    public CreatedEntitiesCounter beforeCommit( TransactionData data, Transaction transaction, GraphDatabaseService databaseService ) throws Exception
    {
        return new CreatedEntitiesCounter( size( data.createdNodes() ), size( data.createdRelationships() ) );
    }

    @Override
    public void afterCommit( TransactionData data, CreatedEntitiesCounter entitiesCounter, GraphDatabaseService databaseService )
    {
        System.out.println( "Number of created nodes: " + entitiesCounter.getCreatedNodes() );
        System.out.println( "Number of created relationships: " + entitiesCounter.getCreatedRelationships() );
    }

    @Override
    public void afterRollback( TransactionData data, CreatedEntitiesCounter state, GraphDatabaseService databaseService )
    {
    }
}

private static class CreatedEntitiesCounter
{
    private final long createdNodes;
    private final long createdRelationships;

    public CreatedEntitiesCounter( long createdNodes, long createdRelationships )
    {
        this.createdNodes = createdNodes;
        this.createdRelationships = createdRelationships;
    }

    public long getCreatedNodes()
    {
        return createdNodes;
    }

    public long getCreatedRelationships()
    {
        return createdRelationships;
    }
}
© . All rights reserved.