Confluent Platform 快速入门

概述

我们将使用 Docker Compose 创建一个包含 Confluent Platform 组件和 Neo4j 的环境,它们在 Docker 内运行。

Neo4j Kafka 连接器将首先配置为使用源实例。源将检索来自节点模式 (:TestSource)CREATEUPDATEDELETE 操作的更改。接收到的更改将根据操作发布到 createsupdatesdeletes 主题中。

接下来,我们将创建一个接收器实例,它将监听 createsupdatesdeletes 主题中的消息,并在收到消息时执行 Cypher 语句以在 Neo4j 中应用相应的更改。

以下指南使用 Confluent Platform docker 镜像。

使用 Docker Compose 运行

将以下 Docker Compose 文件复制到所需的目录。

以下示例 docker-compose.yml 文件利用了 docker compose 的最新功能,需要最新版本的 docker compose。请确保您拥有至少 v2.20.3 版本的工具。
docker-compose.yml
---
services:
  neo4j:
    image: neo4j:5-enterprise
    hostname: neo4j
    container_name: neo4j
    # this is to ensure you have the latest 5.x version of the database
    pull_policy: always
    ports:
      - "7474:7474"
      - "7687:7687"
    environment:
      NEO4J_AUTH: neo4j/password
      NEO4J_ACCEPT_LICENSE_AGREEMENT: "yes"
      NEO4J_server_memory_heap_max__size: "4G"
    healthcheck:
      test: [ "CMD", "cypher-shell", "-u", "neo4j", "-p", "password", "RETURN 1" ]
      start_period: 2m
      start_interval: 10s
      interval: 30s
      timeout: 10s
      retries: 5


  zookeeper:
    image: confluentinc/cp-zookeeper:7.5.2
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    healthcheck:
      test: [ "CMD", "nc", "-z", "localhost", "2181" ]
      start_period: 5m
      start_interval: 10s
      interval: 1m
      timeout: 10s
      retries: 5

  broker:
    image: confluentinc/cp-server:7.5.2
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
      - "9101:9101"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_JMX_PORT: 9101
      KAFKA_JMX_HOSTNAME: localhost
      KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://schema-registry:8081
      CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
      CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
      CONFLUENT_METRICS_ENABLE: 'true'
      CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
    healthcheck:
      test: [ "CMD", "nc", "-z", "localhost", "9092" ]
      start_period: 5m
      start_interval: 10s
      interval: 1m
      timeout: 10s
      retries: 5

  schema-registry:
    image: confluentinc/cp-schema-registry:7.5.2
    hostname: schema-registry
    container_name: schema-registry
    depends_on:
      - broker
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092'
      SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
    healthcheck:
      test: [ "CMD", "nc", "-z", "localhost", "8081" ]
      start_period: 5m
      start_interval: 10s
      interval: 1m
      timeout: 10s
      retries: 5

  connect:
    image: confluentinc/cp-server-connect:7.5.2
    hostname: connect
    container_name: connect
    depends_on:
      - broker
      - schema-registry
    ports:
      - "8083:8083"
    volumes:
      - ./plugins:/tmp/connect-plugins
    environment:
      CONNECT_BOOTSTRAP_SERVERS: 'broker:29092'
      CONNECT_REST_ADVERTISED_HOST_NAME: connect
      CONNECT_GROUP_ID: compose-connect-group
      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
      # CLASSPATH required due to CC-2422
      CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-7.5.2.jar
      CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
      CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
      CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components,/tmp/connect-plugins"
      CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR
    healthcheck:
      test: [ "CMD", "nc", "-z", "localhost", "8083" ]
      start_period: 5m
      start_interval: 10s
      interval: 1m
      timeout: 10s
      retries: 5

  control-center:
    image: confluentinc/cp-enterprise-control-center:7.5.2
    hostname: control-center
    container_name: control-center
    depends_on:
      - broker
      - schema-registry
      - connect
    ports:
      - "9021:9021"
    environment:
      CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:29092'
      CONTROL_CENTER_CONNECT_CONNECT-DEFAULT_CLUSTER: 'connect:8083'
      CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
      CONTROL_CENTER_REPLICATION_FACTOR: 1
      CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
      CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
      CONFLUENT_METRICS_TOPIC_REPLICATION: 1
      PORT: 9021
    healthcheck:
      test: [ "CMD", "curl", "-f", "http://localhost:9021" ]
      start_period: 5m
      start_interval: 10s
      interval: 1m
      timeout: 10s
      retries: 5

将以下 Neo4j Kafka 连接器工件复制到与您的 docker-compose.yml 文件相同的目录中的名为 plugins 的目录。目录结构应如下所示;

quickstart/
├─ plugins/
│  ├─ neo4j-kafka-connect-5.1.5.jar
├─ docker-compose.yml

打开终端,转到 Docker Compose 文件的目录并运行

docker compose up -d

完成此过程后,您应该已启动并运行所有模块。您可以按如下所示检查所有服务的运行状态

docker compose ps

这将返回一个表,其中显示每个服务都已启动并运行。

NAME                COMMAND                  SERVICE             STATUS              PORTS
broker              "/etc/confluent/dock…"   broker              running             0.0.0.0:9092->9092/tcp, 0.0.0.0:9101->9101/tcp
connect             "bash -c '# confluen…"   connect             running             0.0.0.0:8083->8083/tcp, 9092/tcp
control-center      "/etc/confluent/dock…"   control-center      running             0.0.0.0:9021->9021/tcp
neo4j               "tini -g -- /startup…"   neo4j               running             0.0.0.0:7474->7474/tcp, 7473/tcp, 0.0.0.0:7687->7687/tcp
schema-registry     "/etc/confluent/dock…"   schema-registry     running             0.0.0.0:8081->8081/tcp
zookeeper           "/etc/confluent/dock…"   zookeeper           running             2888/tcp, 0.0.0.0:2181->2181/tcp, 3888/tcp

现在,您可以通过以下网址访问您的 Neo4j 实例:http://localhost:7474,使用 neo4j 作为用户名和 password 作为密码登录(更新 Docker Compose 文件中的 NEO4J_AUTH 环境变量以更改它)。确认您可以通过 http://localhost:9021/clusters 访问 Confluent Control Center 实例,并且集群报告为健康状态(这可能需要 90-120 秒)。您应该在控制中心中有一个代理、多个主题和一个连接集群。

启用 CDC

通过执行以下 Cypher 命令,在源数据库上启用变更数据捕获。有关变更数据捕获及其启用方式的更多信息,请参阅 变更数据捕获 > 启用 CDC > Neo4j DBMS(适用于本地安装)和 变更数据捕获 > 启用 CDC > Aura(适用于 Aura)。

ALTER DATABASE neo4j SET OPTION txLogEnrichment 'FULL';

使用 CDC 的源

首先,我们需要将 Neo4j 设置为一个源数据库,它将为主题提供消息。选择以下消息序列化格式之一,将提供的文件内容保存到本地目录中,并将其命名为 source.neo4j.json

{
  "name": "Neo4jSourceConnectorAVRO",
  "config": {
    "connector.class": "org.neo4j.connectors.kafka.source.Neo4jConnector",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "neo4j.uri": "neo4j://neo4j:7687",
    "neo4j.authentication.type": "BASIC",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "password",
    "neo4j.source-strategy": "CDC",
    "neo4j.start-from": "NOW",
    "neo4j.cdc.poll-interval": "1s",
    "neo4j.cdc.poll-duration": "5s",
    "neo4j.cdc.topic.creates.patterns.0.pattern": "(:TestSource)",
    "neo4j.cdc.topic.creates.patterns.0.operation": "CREATE",
    "neo4j.cdc.topic.updates.patterns.0.pattern": "(:TestSource)",
    "neo4j.cdc.topic.updates.patterns.0.operation": "UPDATE",
    "neo4j.cdc.topic.deletes.patterns.0.pattern": "(:TestSource)",
    "neo4j.cdc.topic.deletes.patterns.0.operation": "DELETE"
  }
}
{
  "name": "Neo4jSourceConnectorJSONSchema",
  "config": {
    "connector.class": "org.neo4j.connectors.kafka.source.Neo4jConnector",
    "key.converter": "io.confluent.connect.json.JsonSchemaConverter",
    "key.converter.schemas.enable": true,
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter": "io.confluent.connect.json.JsonSchemaConverter",
    "value.converter.schemas.enable": true,
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "neo4j.uri": "neo4j://neo4j:7687",
    "neo4j.authentication.type": "BASIC",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "password",
    "neo4j.source-strategy": "CDC",
    "neo4j.start-from": "NOW",
    "neo4j.cdc.poll-interval": "1s",
    "neo4j.cdc.poll-duration": "5s",
    "neo4j.cdc.topic.creates.patterns.0.pattern": "(:TestSource)",
    "neo4j.cdc.topic.creates.patterns.0.operation": "CREATE",
    "neo4j.cdc.topic.updates.patterns.0.pattern": "(:TestSource)",
    "neo4j.cdc.topic.updates.patterns.0.operation": "UPDATE",
    "neo4j.cdc.topic.deletes.patterns.0.pattern": "(:TestSource)",
    "neo4j.cdc.topic.deletes.patterns.0.operation": "DELETE"
  }
}
{
  "name": "Neo4jSourceConnectorProtobuf",
  "config": {
    "connector.class": "org.neo4j.connectors.kafka.source.Neo4jConnector",
    "key.converter": "io.confluent.connect.protobuf.ProtobufConverter",
    "key.converter.schemas.enable": true,
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "key.converter.optional.for.nullables": true,
    "value.converter": "io.confluent.connect.protobuf.ProtobufConverter",
    "value.converter.schemas.enable": true,
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter.optional.for.nullables": true,
    "neo4j.uri": "neo4j://neo4j:7687",
    "neo4j.authentication.type": "BASIC",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "password",
    "neo4j.source-strategy": "CDC",
    "neo4j.start-from": "NOW",
    "neo4j.cdc.poll-interval": "1s",
    "neo4j.cdc.poll-duration": "5s",
    "neo4j.cdc.topic.creates.patterns.0.pattern": "(:TestSource)",
    "neo4j.cdc.topic.creates.patterns.0.operation": "CREATE",
    "neo4j.cdc.topic.updates.patterns.0.pattern": "(:TestSource)",
    "neo4j.cdc.topic.updates.patterns.0.operation": "UPDATE",
    "neo4j.cdc.topic.deletes.patterns.0.pattern": "(:TestSource)",
    "neo4j.cdc.topic.deletes.patterns.0.operation": "DELETE"
  }
}

现在,我们将通过调用以下 REST 调用来创建源实例

curl -X POST http://localhost:8083/connectors \
  -H "Content-Type:application/json" \
  -H "Accept:application/json" \
  -d @source.neo4j.json

这将创建一个 Kafka Connect 源实例,它将使用您选择的序列化格式将更改事件消息发送到名为 createsupdatesdeletes 的主题中。在控制中心中,确认源连接器已在连接选项卡中创建,位于 connect-default 下。

如上所示,您可以配置多个模式以读取更改,并将它们发布到您选择的主题。因此,根据上述配置,连接器将读取发生在标签为 TestSource 的节点上的更改,消息的结构将基于 变更数据捕获 > 更改事件模式,并根据配置的消息格式进行序列化。根据操作类型,预期更改事件将具有以下结构。

{
  "id": "<id>",
  "txId": 12,
  "seq": 0,
  "metadata": {
    "executingUser": "neo4j",
    "authenticatedUser": "neo4j",
    "captureMode": "FULL",
    "connectionClient": "127.0.0.1:51320",
    "serverId": "<server-id>",
    "databaseName": "<database-name>",
    "connectionType": "bolt",
    "connectionServer": "127.0.0.1:51316",
    "txStartTime": "2023-11-03T11:58:30.429Z",
    "txCommitTime": "2023-11-03T11:58:30.526Z",
    "txMetadata": {}
  },
  "event": {
    "elementId": "4:b7e35973-0aff-42fa-873b-5de31868cb4a:1",
    "keys": {},
    "eventType": "n",
    "state": {
      "before": null,
      "after": {
        "properties": {
          "name": "<name>",
          "surname": "<surname>"
        },
        "labels": ["TestSource"]
      }
    },
    "operation": "c",
    "labels": ["TestSource"]
  }
}
{
  "id": "<id>",
  "txId": 12,
  "seq": 0,
  "metadata": {
    "executingUser": "neo4j",
    "authenticatedUser": "neo4j",
    "captureMode": "FULL",
    "connectionClient": "127.0.0.1:51320",
    "serverId": "<server-id>",
    "databaseName": "<database-name>",
    "connectionType": "bolt",
    "connectionServer": "127.0.0.1:51316",
    "txStartTime": "2023-11-03T11:58:30.429Z",
    "txCommitTime": "2023-11-03T11:58:30.526Z",
    "txMetadata": {}
  },
  "event": {
    "elementId": "4:b7e35973-0aff-42fa-873b-5de31868cb4a:1",
    "keys": {},
    "eventType": "n",
    "state": {
      "before": {
        "properties": {
          "name": "<old-name>",
          "surname": "<old-surname>"
        },
        "labels": ["TestSource"]
      },
      "after": {
        "properties": {
          "name": "<new-name>",
          "surname": "<new-surname>"
        },
        "labels": ["TestSource"]
      }
    },
    "operation": "u",
    "labels": ["TestSource"]
  }
}
{
  "id": "<id>",
  "txId": 12,
  "seq": 0,
  "metadata": {
    "executingUser": "neo4j",
    "authenticatedUser": "neo4j",
    "captureMode": "FULL",
    "connectionClient": "127.0.0.1:51320",
    "serverId": "<server-id>",
    "databaseName": "<database-name>",
    "connectionType": "bolt",
    "connectionServer": "127.0.0.1:51316",
    "txStartTime": "2023-11-03T11:58:30.429Z",
    "txCommitTime": "2023-11-03T11:58:30.526Z",
    "txMetadata": {}
  },
  "event": {
    "elementId": "4:b7e35973-0aff-42fa-873b-5de31868cb4a:1",
    "keys": {},
    "eventType": "n",
    "state": {
      "before": {
        "properties": {
          "name": "<name>",
          "surname": "<surname>"
        },
        "labels": ["TestSource"]
      },
      "after": null
    },
    "operation": "d",
    "labels": ["TestSource"]
  }
}

现在您有了正在运行的源实例,您可以在 Neo4j 中创建以下节点

CREATE (:TestSource {name: 'john', surname: 'doe'});
CREATE (:TestSource {name: 'mary', surname: 'doe'});
CREATE (:TestSource {name: 'jack', surname: 'small'});

这将导致将新消息发布到名为 creates 的主题中。

使用 Cypher 的接收器

设置源连接器后,下一步是配置接收器连接器,它将使用发布到 createsupdatesdeletes 主题的消息。

首先,将以下 JSON 文件保存到名为 sink.neo4j.json 的本地目录中。

{
  "name": "Neo4jSinkConnectorCypherAVRO",
  "config": {
    "topics": "creates,updates,deletes",
    "connector.class": "org.neo4j.connectors.kafka.sink.Neo4jConnector",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "neo4j.uri": "neo4j://neo4j:7687",
    "neo4j.authentication.type": "BASIC",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "password",
    "neo4j.cypher.topic.creates": "WITH __value.event.state.after AS state MERGE (p:Person {name: state.properties.name, surname: state.properties.surname}) MERGE (f:Family {name: state.properties.surname}) MERGE (p)-[:BELONGS_TO]->(f)",
    "neo4j.cypher.topic.updates": "WITH __value.event.state.before AS before, __value.event.state.after AS after MATCH (p:Person {name: before.properties.name, surname: before.properties.surname}) MATCH (fPre:Family {name: before.properties.surname}) OPTIONAL MATCH (p)-[b:BELONGS_TO]->(fPre) DELETE b WITH after, p SET p.name = after.properties.name, p.surname = after.properties.surname MERGE (f:Family {name: after.properties.surname}) MERGE (p)-[:BELONGS_TO]->(f)",
    "neo4j.cypher.topic.deletes": "WITH __value.event.state.before AS before MATCH (p:Person {name: before.properties.name, surname: before.properties.surname}) DETACH DELETE p",
    "neo4j.cypher.bind-header-as": "",
    "neo4j.cypher.bind-key-as": "",
    "neo4j.cypher.bind-value-as": "__value",
    "neo4j.cypher.bind-value-as-event": false
  }
}
{
  "name": "Neo4jSinkConnectorCypherJSONSchema",
  "config": {
    "topics": "creates,updates,deletes",
    "connector.class": "org.neo4j.connectors.kafka.sink.Neo4jConnector",
    "key.converter": "io.confluent.connect.json.JsonSchemaConverter",
    "key.converter.schemas.enable": true,
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter": "io.confluent.connect.json.JsonSchemaConverter",
    "value.converter.schemas.enable": true,
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "neo4j.uri": "neo4j://neo4j:7687",
    "neo4j.authentication.type": "BASIC",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "password",
    "neo4j.cypher.topic.creates": "WITH __value.event.state.after AS state MERGE (p:Person {name: state.properties.name, surname: state.properties.surname}) MERGE (f:Family {name: state.properties.surname}) MERGE (p)-[:BELONGS_TO]->(f)",
    "neo4j.cypher.topic.updates": "WITH __value.event.state.before AS before, __value.event.state.after AS after MATCH (p:Person {name: before.properties.name, surname: before.properties.surname}) MATCH (fPre:Family {name: before.properties.surname}) OPTIONAL MATCH (p)-[b:BELONGS_TO]->(fPre) DELETE b WITH after, p SET p.name = after.properties.name, p.surname = after.properties.surname MERGE (f:Family {name: after.properties.surname}) MERGE (p)-[:BELONGS_TO]->(f)",
    "neo4j.cypher.topic.deletes": "WITH __value.event.state.before AS before MATCH (p:Person {name: before.properties.name, surname: before.properties.surname}) DETACH DELETE p",
    "neo4j.cypher.bind-header-as": "",
    "neo4j.cypher.bind-key-as": "",
    "neo4j.cypher.bind-value-as": "__value",
    "neo4j.cypher.bind-value-as-event": false
  }
}
{
  "name": "Neo4jSinkConnectorCypherProtobuf",
  "config": {
    "topics": "creates,updates,deletes",
    "connector.class": "org.neo4j.connectors.kafka.sink.Neo4jConnector",
    "key.converter": "io.confluent.connect.protobuf.ProtobufConverter",
    "key.converter.schemas.enable": true,
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "key.converter.optional.for.nullables": true,
    "value.converter": "io.confluent.connect.protobuf.ProtobufConverter",
    "value.converter.schemas.enable": true,
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter.optional.for.nullables": true,
    "neo4j.uri": "neo4j://neo4j:7687",
    "neo4j.authentication.type": "BASIC",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "password",
    "neo4j.cypher.topic.creates": "WITH __value.event.state.after AS state MERGE (p:Person {name: state.properties.name, surname: state.properties.surname}) MERGE (f:Family {name: state.properties.surname}) MERGE (p)-[:BELONGS_TO]->(f)",
    "neo4j.cypher.topic.updates": "WITH __value.event.state.before AS before, __value.event.state.after AS after MATCH (p:Person {name: before.properties.name, surname: before.properties.surname}) MATCH (fPre:Family {name: before.properties.surname}) OPTIONAL MATCH (p)-[b:BELONGS_TO]->(fPre) DELETE b WITH after, p SET p.name = after.properties.name, p.surname = after.properties.surname MERGE (f:Family {name: after.properties.surname}) MERGE (p)-[:BELONGS_TO]->(f)",
    "neo4j.cypher.topic.deletes": "WITH __value.event.state.before AS before MATCH (p:Person {name: before.properties.name, surname: before.properties.surname}) DETACH DELETE p",
    "neo4j.cypher.bind-header-as": "",
    "neo4j.cypher.bind-key-as": "",
    "neo4j.cypher.bind-value-as": "__value",
    "neo4j.cypher.bind-value-as-event": false
  }
}

现在,我们将通过调用以下 REST 调用来创建接收器实例

curl -X POST http://localhost:8083/connectors \
  -H "Content-Type:application/json" \
  -H "Accept:application/json" \
  -d @sink.neo4j.json

这将配置接收器实例以使用您选择的序列化格式使用数据。Cypher 策略将根据属性 neo4j.cypher.topic.createsneo4j.cypher.topic.updatesneo4j.cypher.topic.deletes 定义的 Cypher 查询模板构建 Cypher 查询。

测试

现在,您可以通过以下网址访问您的 Confluent Control Center 实例:http://localhost:9021/clusters,并验证至少已创建名为 creates 的主题(如连接器配置中所指定),以及源和接收器连接器实例是否在连接下运行,connect-default。

在源和接收器连接器都运行的情况下,先前创建的 :TestSource 节点将导致源实例将消息发布到 creates 主题。然后,接收器实例将使用这些消息,并在 Neo4j 中创建相应的 :Person:Family 节点。当您创建、更新和删除标记为 TestSource 的节点时,updatesdeletes 主题也将被创建。

通过在 http://localhost:7474/browser/ 中的 Neo4j 浏览器中执行以下查询来验证是否确实如此。

MATCH (n:(Person | Family)) RETURN n

现在,您可以通过执行更多类似以下的语句来创建、更新或删除 Person 和 Family 节点

创建一个新的人
CREATE (:TestSource {name: 'Ann', surname: 'Bolin'});

验证是否创建并相互链接了新的 PersonFamily 节点。

更新现有的人
MATCH (n:TestSource {name: 'mary', surname: 'doe'}) SET n.surname = 'smith';

验证现有的 Person 节点现在是否已更新为 smith 的姓氏,并链接到新的 Family 节点。

删除现有的人
MATCH (n:TestSource {name: 'mary', surname: 'smith'}) DELETE n;

验证现有的 Person 节点是否已删除。

总结

在本快速入门中,我们展示了如何将 Neo4j 数据库配置为既是 Kafka 主题消息的来源,又是这些消息的接收器,以在数据库中创建、更新或删除节点和关系。通常,我们的连接器用作接收器,从其他数据源通过 Apache Kafka 或 Confluent 拉取数据,或用作源,将 Apache Kafka 或 Confluent 推送到其他数据库中。

故障排除

如果您没有看到任何消息被发布到 createsupdatesdeletes 主题中,或者没有看到任何 :Family:Person 节点被创建,请通过执行以下命令检查 Kafka Connect 日志,并解决报告的任何问题。

docker compose logs connect