JavaScript 中的 CDC 示例

const neo4j = require('neo4j-driver');
const yargs = require('yargs/yargs');
const {hideBin} = require('yargs/helpers');
const argv = yargs(hideBin(process.argv)).argv;
const util = require('util');


function applyChange(change) { (1)
  console.log(util.inspect(change.toObject(), false, null, true));
}

async function queryChanges(driver, database, cursor, selectors) {
  const session = driver.session({database: database});
  try {
    const current = currentChangeId(driver, database);
    await session.executeRead((tx) => {
      return tx.run('CALL db.cdc.query($from, $selectors)',
          {from: cursor, selectors: selectors}); (2)
    })
        .then((res) => {
          if (res.records.length === 0) {
            cursor = current; (3)
          } else {
            res.records.map((change) => {
              applyChange(change); (4)
              cursor = change.get('id'); (5)
            });
          }
        });
  } catch (e) {
    console.log('Failed to apply change', e);
  } finally {
    session.close();
  }
  return cursor;
}
function queryChangesLoop(driver, database, cursor, selectors) {
  setTimeout(async () => {
    cursor = await queryChanges(driver, database, cursor, selectors);
    queryChangesLoop(driver, database, cursor, selectors);
  }, 500);
}

function earliestChangeId(driver, database) { (6)
  return driver.executeQuery('CALL db.cdc.earliest', {}, {database: database})
      .then((res) => {
        return res.records[0].get('id');
      });
}

function currentChangeId(driver, database) { (7)
  return driver.executeQuery('CALL db.cdc.current', {}, {'database': database})
      .then((res) => {
        return res.records[0].get('id');
      });
}


async function main() {
  const uri = argv.address ?? argv.a ?? 'neo4j://localhost:7687';
  const database = argv.database ?? argv.d ?? 'neo4j';
  const user = argv.user ?? argv.u ?? 'neo4j';
  const password = argv.password ?? argv.p ?? 'passw0rd';
  let cursor = argv.from ?? argv.f;

  const driver = neo4j.driver(uri, neo4j.auth.basic(user, password));

  if (!cursor) {
    cursor = await currentChangeId(driver, database);
  }

  const selectors = [ (8)
    // {"select":"n"}
  ];
  queryChangesLoop(driver, database, cursor, selectors); (9)

  // await driver.close() (10)
}

main();
1 此方法对每个更改调用一次。
2 此查询从数据库获取更改。
3 光标向前移动以保持最新状态。这可能在您的用例中不必要。有关详细信息,请参阅光标管理
4 对每个更改调用一次函数。
5 请注意,在匿名函数之外定义的光标已更新。session.executeRead 可能会重试查询,重新运行内部匿名函数。为了避免两次看到相同的更改,请在应用更改时更新光标。
6 使用此函数获取最早可用的更改 ID。
7 使用此函数获取当前更改 ID。
8 结果可以被过滤以返回更改的子集。注释掉的行将仅选择节点更改并排除所有关系更改。
9 重复调用 queryChanges,使用上一次调用的光标。
10 当循环应该终止时调用 driver.close()