C# 中 CDC 用法的示例

using System.CommandLine;
using System.Text.Json;
using Neo4j.Driver;

#pragma warning disable CS4014

namespace Neo4j.CDC.Sample;

static class Program
{
    class CDCSample
    {
        private readonly IDriver _driver;
        private readonly string? _database;
        private volatile string? _from;
        private readonly IEnumerable<object> _selectors;
        private readonly ManualResetEventSlim _event;

        public CDCSample(IDriver driver, string? database, string? from, IEnumerable<object>? selectors)
        {
            _driver = driver ?? throw new ArgumentNullException(nameof(driver));
            _database = database;
            _from = from;
            _selectors = selectors ?? Enumerable.Empty<object>();
            _event = new ManualResetEventSlim();
        }

        private static void ApplyChange(IRecord record) (1)
        {
            var jsonText = JsonSerializer.Serialize(record.Values, new JsonSerializerOptions()
            {
                WriteIndented = true
            });

            Console.WriteLine(jsonText);
        }

        private async Task QueryChanges(CancellationToken cancellation) (2)
        {
            await using var session = _driver.AsyncSession(ConfigureSession(_database));
            var current = await CurrentChangeId();
            await session.ExecuteReadAsync(async tx =>
            {
                var from = _from;
                var result = await tx.RunAsync("CALL db.cdc.query($from, $selectors)", new
                {
                    from,
                    selectors = _selectors,
                });

                var processed = 0;
                await foreach (var record in result.WithCancellation(cancellation))
                {
                    ApplyChange(record); (3)
                    _from = record["id"].As<string>(); (4)
                    processed++;
                }

                if (processed == 0)
                {
                    _from = current; (5)
                }
            });
        }

        private static Action<SessionConfigBuilder> ConfigureSession(string? database)
        {
            return sc =>
            {
                if (!string.IsNullOrEmpty(database))
                {
                    sc.WithDatabase(database);
                }
            };
        }

        private async Task<string> EarliestChangeId() (6)
        {
            var response = await _driver
                .ExecutableQuery("CALL db.cdc.earliest")
                .WithMap(record => record["id"].As<string>())
                .ExecuteAsync();

            return response.Result[0];
        }

        private async Task<string> CurrentChangeId() (7)
        {
            var response = await _driver
                .ExecutableQuery("CALL db.cdc.current")
                .WithMap(record => record["id"].As<string>())
                .ExecuteAsync();

            return response.Result[0];
        }

        public async Task Start(CancellationToken cancellation)
        {
            if (string.IsNullOrEmpty(_from))
            {
                _from = await CurrentChangeId();
            }

            _event.Reset();
            Task.Factory.StartNew(async () =>
            {
                try
                {
                    while (!cancellation.IsCancellationRequested)
                    {
                        await QueryChanges(cancellation);

                        await Task.Delay(TimeSpan.FromMilliseconds(500), cancellation); (8)
                    }
                }
                finally
                {
                    _event.Set();
                }
            }, cancellation, TaskCreationOptions.LongRunning, TaskScheduler.Current);
        }

        public void WaitForExit()
        {
            _event.Wait();
        }
    }

    static async Task<int> Main(string[] args)
    {
        var uriOption = new Option<string>("--address", () => "bolt://localhost:7687", "Bolt URI");
        uriOption.AddAlias("-a");
        var databaseOption = new Option<string?>("--database", () => "", "Database");
        databaseOption.AddAlias("-d");
        var usernameOption = new Option<string>("--username", () => "neo4j", "Username");
        usernameOption.AddAlias("-u");
        var passwordOption = new Option<string>("--password", () => "passw0rd", "Password");
        passwordOption.AddAlias("-p");
        var fromOption = new Option<string?>("--from", () => null, "Change identifier to query changes from");
        fromOption.AddAlias("-f");

        var cmd = new RootCommand("Sample CDC application");
        cmd.AddOption(uriOption);
        cmd.AddOption(databaseOption);
        cmd.AddOption(usernameOption);
        cmd.AddOption(passwordOption);
        cmd.AddOption(fromOption);

        cmd.SetHandler(ctx =>
        {
            var cancellation = ctx.GetCancellationToken();
            var uri = ctx.ParseResult.GetValueForOption(uriOption);
            var database = ctx.ParseResult.GetValueForOption(databaseOption);
            var username = ctx.ParseResult.GetValueForOption(usernameOption);
            var password = ctx.ParseResult.GetValueForOption(passwordOption);
            var from = ctx.ParseResult.GetValueForOption(fromOption);

            DoRootCommand(cancellation, uri!, username!, password!, database, from)
                .Wait(cancellation);
        });

        return await cmd.InvokeAsync(args);
    }

    private static async Task DoRootCommand(CancellationToken cancellation, string uri, string username,
        string password,
        string? database, string? from)
    {
        try
        {
            var selectors = new List<object>
            {
                // new (9)
                // {
                //     select = "n", labels = new[] { "Person", "Employee" }
                // },
            };
            await using var driver = GraphDatabase.Driver(uri, AuthTokens.Basic(username, password));
            var service = new CDCSample(driver, database, from, selectors);

            await service.Start(cancellation);

            await Console.Out.WriteLineAsync("starting...");
            service.WaitForExit();
            await Console.Out.WriteLineAsync("quitting...");
        }
        catch (Exception e)
        {
            await Console.Error.WriteLineAsync("Error: " + e);
        }
    }
}
1 此方法针对每个更改事件调用一次。应根据您的用例进行替换。
2 此方法从数据库中获取更改。
3 此方法针对每个更改调用一次。
4 请注意,ExecuteReadAsync 可能会重试失败的查询。为了避免看到相同的更改两次,请在应用更改时更新游标。
5 游标向前移动以保持最新。这在您的用例中可能不是必需的。有关详细信息,请参阅游标管理
6 使用此函数获取最早可用的更改 ID。
7 使用此函数获取当前更改 ID。
8 等待 500 毫秒,以便重复调用 QueryChanges
9 结果可能会被过滤以返回更改的子集。注释掉的代码行将仅选择同时具有 PersonEmployee 标签的节点更改。