事务管理

此页面仅描述了使用 Neo4j Java API 时事务管理的某些特定方面,并提供了一些关于如何避免死锁以及如何为特定数据库注册事务事件侦听器并执行基本操作的示例在事务变更集之上。

因此,强烈建议您阅读操作手册 → 数据库内部结构和事务行为,然后再继续阅读本页。

概述

访问图、索引或模式的数据库操作在事务中执行,以确保 ACID 属性。事务是单线程的、封闭的和独立的。可以在单个线程中启动多个事务,它们彼此独立。

使用事务的交互周期遵循以下步骤

  1. 开始事务。

  2. 执行数据库操作。

  3. 提交或回滚事务。

完成每个事务至关重要,因为事务获取的锁或内存只有在完成时才会释放。有关锁和死锁的更多信息,请参阅操作手册 → 锁和死锁

Neo4j 中事务的惯用方式是使用 try-with-resources 语句并声明 transaction 作为其中一个资源。然后启动事务并尝试执行图操作。try 块中的最后一个操作应根据业务逻辑提交或回滚事务。在这种情况下,try-with-resources 用作防止意外异常的保护措施,以及确保无论语句块内部发生什么都关闭事务的附加安全机制。所有未提交的事务将在语句结束时作为资源清理的一部分回滚。对于明确提交或回滚的事务,不需要资源清理,事务关闭是一个空操作。

事务中执行的所有修改都保存在内存中。这意味着必须将非常大的更新拆分成多个事务,以避免内存不足。

死锁处理示例

由 Neo4j 管理的锁以外的其他同步导致的死锁仍然可能发生。由于 Neo4j API 中的所有操作都是线程安全的(除非另有说明),因此不需要外部同步。需要同步的其他代码应以在同步块中从不执行任何 Neo4j 操作的方式同步。

以下是如何在过程、服务器扩展或使用嵌入式 Neo4j 时处理死锁的示例。

代码片段使用的完整源代码可以在DeadlockDocTest.java 中找到。

在处理代码中的死锁时,您可能需要解决几个问题

  • 只进行有限次数的重试,如果达到阈值,则失败。

  • 在每次尝试之间暂停,以允许另一个事务在再次尝试之前完成。

  • 重试循环不仅对死锁有用,而且对其他类型的瞬态错误也有用。

以下是关于如何实现这一点的示例。

示例 1. 使用重试循环处理死锁

此示例展示了如何使用重试循环来处理死锁

Throwable txEx = null;
int RETRIES = 5;
int BACKOFF = 3000;
for ( int i = 0; i < RETRIES; i++ )
{
    try ( Transaction tx = databaseService.beginTx() )
    {
        Object result = doStuff(tx);
        tx.commit();
        return result;
    }
    catch ( Throwable ex )
    {
        txEx = ex;

        // Add whatever exceptions to retry on here
        if ( !(ex instanceof DeadlockDetectedException) )
        {
            break;
        }
    }

    // Wait so that we don't immediately get into the same deadlock
    if ( i < RETRIES - 1 )
    {
        try
        {
            Thread.sleep( BACKOFF );
        }
        catch ( InterruptedException e )
        {
            throw new TransactionFailureException( "Interrupted", e );
        }
    }
}

if ( txEx instanceof TransactionFailureException )
{
    throw ((TransactionFailureException) txEx);
}
else if ( txEx instanceof Error )
{
    throw ((Error) txEx);
}
else
{
    throw ((RuntimeException) txEx);
}

事务事件

一个neo4j.org.graphdb.event.TransactionEventListener 可以注册以接收 Neo4j 数据库事务事件。一旦它在org.neo4j.dbms.api.DatabaseManagementService 实例中注册,它就会接收其注册的数据库的事务事件。侦听器会收到已执行任何写入操作并将要提交的事务的通知。如果未调用 Transaction#commit(),或者事务已使用 Transaction#rollback() 回滚,它将回滚,并且不会向侦听器发送任何事件。

在提交事务之前,侦听器的 beforeCommit 方法将被调用,并包含在事务中进行的所有修改的整个差异。此时事务仍在运行,因此仍然可以进行更改。该方法也可能抛出异常,这将阻止事务提交。如果事务回滚,则会调用侦听器的 afterRollback 方法。

侦听器执行的顺序是不确定的——不能保证一个侦听器做出的更改会为其他侦听器看到。

如果所有注册的侦听器都成功执行了 beforeCommit,则事务将提交,并且 afterCommit 方法将使用相同的事务数据调用。此调用还包括从 beforeCommit 返回的对象。

afterCommit 中,事务将关闭,对org.neo4j.graphdb.event.TransactionData 以外的任何内容的访问都需要打开一个新事务。一个neo4j.org.graphdb.event.TransactionEventListener 会收到已进行任何更改(可通过org.neo4j.graphdb.event.TransactionData 访问)的事务的通知。一些索引和模式更改不会触发这些事件。

以下示例展示了如何为特定数据库注册侦听器并执行基本操作在事务变更集之上。

用于代码片段的完整源代码可以在 TransactionEventListenerExample.java 中找到。

示例 2. TransactionEventListener

注册事务事件监听器并检查更改集

public static void main( String[] args ) throws IOException
{
    FileUtils.deleteDirectory( HOME_DIRECTORY );
    var managementService = new DatabaseManagementServiceBuilder( HOME_DIRECTORY ).build();
    var database = managementService.database( DEFAULT_DATABASE_NAME );

    var countingListener = new CountingTransactionEventListener();
    managementService.registerTransactionEventListener( DEFAULT_DATABASE_NAME, countingListener );

    var connectionType = RelationshipType.withName( "CONNECTS" );
    try ( var transaction = database.beginTx() )
    {
        var startNode = transaction.createNode();
        var endNode = transaction.createNode();
        startNode.createRelationshipTo( endNode, connectionType );
        transaction.commit();
    }
}

private static class CountingTransactionEventListener implements TransactionEventListener<CreatedEntitiesCounter>
{
    @Override
    public CreatedEntitiesCounter beforeCommit( TransactionData data, Transaction transaction, GraphDatabaseService databaseService ) throws Exception
    {
        return new CreatedEntitiesCounter( size( data.createdNodes() ), size( data.createdRelationships() ) );
    }

    @Override
    public void afterCommit( TransactionData data, CreatedEntitiesCounter entitiesCounter, GraphDatabaseService databaseService )
    {
        System.out.println( "Number of created nodes: " + entitiesCounter.getCreatedNodes() );
        System.out.println( "Number of created relationships: " + entitiesCounter.getCreatedRelationships() );
    }

    @Override
    public void afterRollback( TransactionData data, CreatedEntitiesCounter state, GraphDatabaseService databaseService )
    {
    }
}

private static class CreatedEntitiesCounter
{
    private final long createdNodes;
    private final long createdRelationships;

    public CreatedEntitiesCounter( long createdNodes, long createdRelationships )
    {
        this.createdNodes = createdNodes;
        this.createdRelationships = createdRelationships;
    }

    public long getCreatedNodes()
    {
        return createdNodes;
    }

    public long getCreatedRelationships()
    {
        return createdRelationships;
    }
}