Python 中 CDC 使用示例

import getopt
import json
import sys
import time
from threading import Thread

from neo4j import GraphDatabase


class CDCService:
    def __init__(self, driver, database, start_cursor=None, selectors=None):
        self.driver = driver
        self.database = database
        self.cursor = start_cursor
        if self.cursor is None:
            self.cursor = self.current_change_id()
        self.selectors = selectors

    def apply_change(self, record): (1)
        record_dict = {
            k: record.get(k)
            for k in ('id', 'txId', 'seq', 'event', 'metadata')
        }
        print(json.dumps(record_dict, indent=2, default=repr))

    def query_changes_query(self, tx):
        current = self.current_change_id()
        result = tx.run('CALL db.cdc.query($cursor, $selectors)', (2)
                        cursor=self.cursor, selectors=self.selectors)
        if result.peek() == None:
            self.cursor = current (3)
        else:
            for record in result:
                try:
                    self.apply_change(record) (4)
                except Exception as e:
                    print('Error whilst applying change', e)
                    break
                self.cursor = record['id'] (5)

    def query_changes(self):
        with self.driver.session(database=self.database) as session:
            session.execute_read(self.query_changes_query)

    def earliest_change_id(self): (6)
        records, _, _ = self.driver.execute_query(
            'CALL db.cdc.earliest', database_=self.database)
        return records[0]['id']

    def current_change_id(self): (7)
        records, _, _ = self.driver.execute_query(
            'CALL db.cdc.current', database_=self.database)
        return records[0]['id']

    def run(self):
        while True: (9)
            self.query_changes()
            time.sleep(0.5)


def main(argv):
    # Default values
    address = 'neo4j://localhost:7687'
    database = 'neo4j'
    username = 'neo4j'
    password = 'passw0rd'
    cursor = None

    opts, _ = getopt.getopt(
        argv, 'a:d:u:p:f:',
        ['address=', 'database=', 'username=', 'password=', 'from='])
    for opt, arg in opts:
        if opt in ('-a', '--address'):
            address = arg
        elif opt in ('-d', '--database'):
            database = arg
        elif opt in ('-u', '--username'):
            username = arg
        elif opt in ('-p', '--password'):
            password = arg
        elif opt in ('-f', '--from'):
            cursor = arg

    selectors = [ (8)
        # {'select': 'n'}
    ]

    with GraphDatabase.driver(address, auth=(username, password)) as driver:
        cdc = CDCService(driver, database, cursor, selectors)
        cdc_thread = Thread(target=cdc.run, daemon=True)
        cdc_thread.start()
        cdc_thread.join()


if __name__ == '__main__':
    main(sys.argv[1:])
1 此方法针对每个更改事件调用一次。应根据您的用例进行替换。
2 此查询从数据库中获取更改。
3 游标向前移动以保持最新状态。在您的用例中可能不需要这样做。有关详细信息,请参阅游标管理
4 此方法针对每个更改调用一次。
5 例如,当存在网络问题时,session.execute_read 可能会重试 query_changes_query。为避免看到相同的更改两次,请在应用更改时更新游标。
6 使用此函数获取最早可用的更改 ID。
7 使用此函数获取当前更改 ID。
8 可以过滤结果以返回更改的子集。已注释掉的代码行将仅选择节点更改并排除所有关系更改。
9 重复调用 query_changes,使用上一次调用的游标。