Java 中的 CDC 使用示例

package org.neo4j.example;

import org.neo4j.driver.*;
import org.neo4j.driver.Record;
import picocli.CommandLine;
import picocli.CommandLine.Command;

import java.io.Closeable;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import static picocli.CommandLine.*;

class CDCService implements Closeable {
    private final Driver driver;
    private final String database;
    private final AtomicReference<String> cursor = new AtomicReference<>(null);
    private final ScheduledThreadPoolExecutor scheduler;

    private List<Map<String, Object>> selectors = List.of();

    public CDCService(String uri, String database, String user, String password) {
        driver = GraphDatabase.driver(uri, AuthTokens.basic(user, password));
        scheduler = new ScheduledThreadPoolExecutor(1);
        this.database = database;
    }

    @Override
    public void close(){
        driver.close();
        scheduler.shutdown();
    }

    public void setSelectors(List<Map<String, Object>> selectors) {
        this.selectors = selectors;
    }

    private void applyChange(Record change) { (1)
        var eventType = change.get("event").asMap().get("eventType");
        var operation = change.get("event").asMap().get("operation");
        System.out.println("Element of type '%s' changed with operation '%s'".formatted(eventType, operation));
        System.out.println(change);
    }

    private synchronized void queryChanges() { (2)
        try (var session = driver.session(SessionConfig.forDatabase(database))) {
            var current = currentChangeID();
            session.executeRead(tx -> {
                var result = tx.run(new Query("CALL db.cdc.query($from, $selectors)", Map.of("from", cursor.get(), "selectors", selectors)));
                if (!result.hasNext()) {
                    cursor.set(current); (3)
                } else {
                    while (result.hasNext()) {
                        var change = result.next();
                        applyChange(change); (4)
                        cursor.set(change.get("id").asString()); (5)
                    }
                }
                return cursor.get();
            });
        } catch (Exception e) {
            System.err.println(e.getMessage());
            throw new RuntimeException("Error querying/processing changes.", e);
        }
    }

    private String earliestChangeID() { (6)
        try (var session = driver.session(SessionConfig.forDatabase(database))) {
            return session.executeRead(tx -> {
                var result = tx.run(new Query("CALL db.cdc.earliest"));
                return result.single().get("id").asString();
            });
        }
    }

    private String currentChangeID() { (7)
        try (var session = driver.session(SessionConfig.forDatabase(database))) {
            return session.executeRead(tx -> {
                var result = tx.run(new Query("CALL db.cdc.current"));
                return result.single().get("id").asString();
            });
        }
    }

    public void start(String from) {
        cursor.set(from == null ? currentChangeID() : from);
        scheduler.scheduleWithFixedDelay(this::queryChanges, 0, 500, TimeUnit.MILLISECONDS); (8)
        Runtime.getRuntime().addShutdownHook(new Thread(scheduler::shutdownNow));
    }


    public void awaitTermination(int timeout, TimeUnit unit) throws InterruptedException {
        scheduler.awaitTermination(timeout, unit);
    }
}

@Command(name = "Neo4j CDC example usage", mixinStandardHelpOptions = true, version = "1.0", description = "Connects to neo4j and queries for change events through cdc procedures.")
public class Example implements Callable<Integer> {

    @Option(names = {"-a", "--address"}, description = "Where to find the neo4j instance to connect to")
    private String address = "bolt://localhost:7687";
    @Option(names = {"-d", "--database"}, description = "Which database to access")
    private String database = "neo4j";
    @Option(names = {"-u", "--username"}, description = "Username for authenticating against the neo4j server")
    private String username = "neo4j";
    @Option(names = {"-p", "--password"}, description = "Password for authenticating against the neo4j server")
    private String password = "passw0rd";
    @Option(names = {"-f", "--from"}, description = "Cursor value to start streaming from")
    private String from = null;


    @Override
    public Integer call() {

        var cdcService = new CDCService(address, database, username, password);
        List<Map<String, Object>> selectors = List.of( (9)
//                Map.of("select", "n")
        );
        cdcService.setSelectors(selectors);
        cdcService.start(from);

        System.out.println("started querying changes...");

        try {
            cdcService.awaitTermination(1, TimeUnit.DAYS);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }

        System.out.println("quitting...");
        System.out.flush();
        return 0;
    }

    public static void main(String... args) {
        int exitCode = new CommandLine(new Example()).execute(args);
        System.exit(exitCode);
    }
}
1 此方法对每个更改调用一次。
2 此查询从数据库获取更改。
3 游标将向前移动以保持其最新状态。这在您的用例中可能不是必需的。有关详细信息,请参阅游标管理
4 对每个更改调用一次函数。
5 请注意,executeRead 可能会重试失败的查询。为了避免看到相同的更改两次,请在应用更改时更新游标。
6 使用此函数获取最早可用的更改 ID。
7 使用此函数获取当前更改 ID。
8 调度以便queryChanges 被重复调用。
9 结果可以被过滤以返回更改的子集。已注释掉的代码行将仅选择节点更改并排除所有关系更改。