Confluent Platform 快速入门
概述
我们将使用 Docker Compose 创建一个包含 Confluent Platform 组件和 Neo4j 的环境,它们在 Docker 内运行。
Neo4j Kafka 连接器将首先配置为使用源实例。源将检索来自节点模式 (:TestSource)
的 CREATE
、UPDATE
和 DELETE
操作的更改。接收到的更改将根据操作发布到 creates
、updates
和 deletes
主题中。
接下来,我们将创建一个接收器实例,它将监听 creates
、updates
和 deletes
主题中的消息,并在收到消息时执行 Cypher 语句以在 Neo4j 中应用相应的更改。
以下指南使用 Confluent Platform docker 镜像。 |
使用 Docker Compose 运行
将以下 Docker Compose 文件复制到所需的目录。
以下示例 docker-compose.yml 文件利用了 docker compose 的最新功能,需要最新版本的 docker compose。请确保您拥有至少 v2.20.3 版本的工具。 |
---
services:
neo4j:
image: neo4j:5-enterprise
hostname: neo4j
container_name: neo4j
# this is to ensure you have the latest 5.x version of the database
pull_policy: always
ports:
- "7474:7474"
- "7687:7687"
environment:
NEO4J_AUTH: neo4j/password
NEO4J_ACCEPT_LICENSE_AGREEMENT: "yes"
NEO4J_server_memory_heap_max__size: "4G"
healthcheck:
test: [ "CMD", "cypher-shell", "-u", "neo4j", "-p", "password", "RETURN 1" ]
start_period: 2m
start_interval: 10s
interval: 30s
timeout: 10s
retries: 5
zookeeper:
image: confluentinc/cp-zookeeper:7.5.2
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
healthcheck:
test: [ "CMD", "nc", "-z", "localhost", "2181" ]
start_period: 5m
start_interval: 10s
interval: 1m
timeout: 10s
retries: 5
broker:
image: confluentinc/cp-server:7.5.2
hostname: broker
container_name: broker
depends_on:
- zookeeper
ports:
- "9092:9092"
- "9101:9101"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_JMX_PORT: 9101
KAFKA_JMX_HOSTNAME: localhost
KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://schema-registry:8081
CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
CONFLUENT_METRICS_ENABLE: 'true'
CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
healthcheck:
test: [ "CMD", "nc", "-z", "localhost", "9092" ]
start_period: 5m
start_interval: 10s
interval: 1m
timeout: 10s
retries: 5
schema-registry:
image: confluentinc/cp-schema-registry:7.5.2
hostname: schema-registry
container_name: schema-registry
depends_on:
- broker
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092'
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
healthcheck:
test: [ "CMD", "nc", "-z", "localhost", "8081" ]
start_period: 5m
start_interval: 10s
interval: 1m
timeout: 10s
retries: 5
connect:
image: confluentinc/cp-server-connect:7.5.2
hostname: connect
container_name: connect
depends_on:
- broker
- schema-registry
ports:
- "8083:8083"
volumes:
- ./plugins:/tmp/connect-plugins
environment:
CONNECT_BOOTSTRAP_SERVERS: 'broker:29092'
CONNECT_REST_ADVERTISED_HOST_NAME: connect
CONNECT_GROUP_ID: compose-connect-group
CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
# CLASSPATH required due to CC-2422
CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-7.5.2.jar
CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components,/tmp/connect-plugins"
CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR
healthcheck:
test: [ "CMD", "nc", "-z", "localhost", "8083" ]
start_period: 5m
start_interval: 10s
interval: 1m
timeout: 10s
retries: 5
control-center:
image: confluentinc/cp-enterprise-control-center:7.5.2
hostname: control-center
container_name: control-center
depends_on:
- broker
- schema-registry
- connect
ports:
- "9021:9021"
environment:
CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:29092'
CONTROL_CENTER_CONNECT_CONNECT-DEFAULT_CLUSTER: 'connect:8083'
CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
CONTROL_CENTER_REPLICATION_FACTOR: 1
CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
CONFLUENT_METRICS_TOPIC_REPLICATION: 1
PORT: 9021
healthcheck:
test: [ "CMD", "curl", "-f", "http://localhost:9021" ]
start_period: 5m
start_interval: 10s
interval: 1m
timeout: 10s
retries: 5
将以下 Neo4j Kafka 连接器工件复制到与您的 docker-compose.yml
文件相同的目录中的名为 plugins
的目录。目录结构应如下所示;
quickstart/
├─ plugins/
│ ├─ neo4j-kafka-connect-5.1.5.jar
├─ docker-compose.yml
打开终端,转到 Docker Compose 文件的目录并运行
docker compose up -d
完成此过程后,您应该已启动并运行所有模块。您可以按如下所示检查所有服务的运行状态
docker compose ps
这将返回一个表,其中显示每个服务都已启动并运行。
NAME COMMAND SERVICE STATUS PORTS
broker "/etc/confluent/dock…" broker running 0.0.0.0:9092->9092/tcp, 0.0.0.0:9101->9101/tcp
connect "bash -c '# confluen…" connect running 0.0.0.0:8083->8083/tcp, 9092/tcp
control-center "/etc/confluent/dock…" control-center running 0.0.0.0:9021->9021/tcp
neo4j "tini -g -- /startup…" neo4j running 0.0.0.0:7474->7474/tcp, 7473/tcp, 0.0.0.0:7687->7687/tcp
schema-registry "/etc/confluent/dock…" schema-registry running 0.0.0.0:8081->8081/tcp
zookeeper "/etc/confluent/dock…" zookeeper running 2888/tcp, 0.0.0.0:2181->2181/tcp, 3888/tcp
现在,您可以通过以下网址访问您的 Neo4j 实例:http://localhost:7474,使用 neo4j
作为用户名和 password
作为密码登录(更新 Docker Compose 文件中的 NEO4J_AUTH
环境变量以更改它)。确认您可以通过 http://localhost:9021/clusters 访问 Confluent Control Center 实例,并且集群报告为健康状态(这可能需要 90-120 秒)。您应该在控制中心中有一个代理、多个主题和一个连接集群。
启用 CDC
通过执行以下 Cypher 命令,在源数据库上启用变更数据捕获。有关变更数据捕获及其启用方式的更多信息,请参阅 变更数据捕获 > 启用 CDC > Neo4j DBMS(适用于本地安装)和 变更数据捕获 > 启用 CDC > Aura(适用于 Aura)。
ALTER DATABASE neo4j SET OPTION txLogEnrichment 'FULL';
使用 CDC 的源
首先,我们需要将 Neo4j 设置为一个源数据库,它将为主题提供消息。选择以下消息序列化格式之一,将提供的文件内容保存到本地目录中,并将其命名为 source.neo4j.json
。
现在,我们将通过调用以下 REST 调用来创建源实例
curl -X POST http://localhost:8083/connectors \
-H "Content-Type:application/json" \
-H "Accept:application/json" \
-d @source.neo4j.json
这将创建一个 Kafka Connect 源实例,它将使用您选择的序列化格式将更改事件消息发送到名为 creates
、updates
和 deletes
的主题中。在控制中心中,确认源连接器已在连接选项卡中创建,位于 connect-default 下。
如上所示,您可以配置多个模式以读取更改,并将它们发布到您选择的主题。因此,根据上述配置,连接器将读取发生在标签为 TestSource
的节点上的更改,消息的结构将基于 变更数据捕获 > 更改事件模式,并根据配置的消息格式进行序列化。根据操作类型,预期更改事件将具有以下结构。
{
"id": "<id>",
"txId": 12,
"seq": 0,
"metadata": {
"executingUser": "neo4j",
"authenticatedUser": "neo4j",
"captureMode": "FULL",
"connectionClient": "127.0.0.1:51320",
"serverId": "<server-id>",
"databaseName": "<database-name>",
"connectionType": "bolt",
"connectionServer": "127.0.0.1:51316",
"txStartTime": "2023-11-03T11:58:30.429Z",
"txCommitTime": "2023-11-03T11:58:30.526Z",
"txMetadata": {}
},
"event": {
"elementId": "4:b7e35973-0aff-42fa-873b-5de31868cb4a:1",
"keys": {},
"eventType": "n",
"state": {
"before": null,
"after": {
"properties": {
"name": "<name>",
"surname": "<surname>"
},
"labels": ["TestSource"]
}
},
"operation": "c",
"labels": ["TestSource"]
}
}
{
"id": "<id>",
"txId": 12,
"seq": 0,
"metadata": {
"executingUser": "neo4j",
"authenticatedUser": "neo4j",
"captureMode": "FULL",
"connectionClient": "127.0.0.1:51320",
"serverId": "<server-id>",
"databaseName": "<database-name>",
"connectionType": "bolt",
"connectionServer": "127.0.0.1:51316",
"txStartTime": "2023-11-03T11:58:30.429Z",
"txCommitTime": "2023-11-03T11:58:30.526Z",
"txMetadata": {}
},
"event": {
"elementId": "4:b7e35973-0aff-42fa-873b-5de31868cb4a:1",
"keys": {},
"eventType": "n",
"state": {
"before": {
"properties": {
"name": "<old-name>",
"surname": "<old-surname>"
},
"labels": ["TestSource"]
},
"after": {
"properties": {
"name": "<new-name>",
"surname": "<new-surname>"
},
"labels": ["TestSource"]
}
},
"operation": "u",
"labels": ["TestSource"]
}
}
{
"id": "<id>",
"txId": 12,
"seq": 0,
"metadata": {
"executingUser": "neo4j",
"authenticatedUser": "neo4j",
"captureMode": "FULL",
"connectionClient": "127.0.0.1:51320",
"serverId": "<server-id>",
"databaseName": "<database-name>",
"connectionType": "bolt",
"connectionServer": "127.0.0.1:51316",
"txStartTime": "2023-11-03T11:58:30.429Z",
"txCommitTime": "2023-11-03T11:58:30.526Z",
"txMetadata": {}
},
"event": {
"elementId": "4:b7e35973-0aff-42fa-873b-5de31868cb4a:1",
"keys": {},
"eventType": "n",
"state": {
"before": {
"properties": {
"name": "<name>",
"surname": "<surname>"
},
"labels": ["TestSource"]
},
"after": null
},
"operation": "d",
"labels": ["TestSource"]
}
}
现在您有了正在运行的源实例,您可以在 Neo4j 中创建以下节点
CREATE (:TestSource {name: 'john', surname: 'doe'});
CREATE (:TestSource {name: 'mary', surname: 'doe'});
CREATE (:TestSource {name: 'jack', surname: 'small'});
这将导致将新消息发布到名为 creates
的主题中。
使用 Cypher 的接收器
设置源连接器后,下一步是配置接收器连接器,它将使用发布到 creates
、updates
和 deletes
主题的消息。
首先,将以下 JSON 文件保存到名为 sink.neo4j.json
的本地目录中。
现在,我们将通过调用以下 REST 调用来创建接收器实例
curl -X POST http://localhost:8083/connectors \
-H "Content-Type:application/json" \
-H "Accept:application/json" \
-d @sink.neo4j.json
这将配置接收器实例以使用您选择的序列化格式使用数据。Cypher 策略将根据属性 neo4j.cypher.topic.creates
、neo4j.cypher.topic.updates
和 neo4j.cypher.topic.deletes
定义的 Cypher 查询模板构建 Cypher 查询。
测试
现在,您可以通过以下网址访问您的 Confluent Control Center 实例:http://localhost:9021/clusters,并验证至少已创建名为 creates
的主题(如连接器配置中所指定),以及源和接收器连接器实例是否在连接下运行,connect-default。
在源和接收器连接器都运行的情况下,先前创建的 :TestSource
节点将导致源实例将消息发布到 creates
主题。然后,接收器实例将使用这些消息,并在 Neo4j 中创建相应的 :Person
和 :Family
节点。当您创建、更新和删除标记为 TestSource
的节点时,updates
和 deletes
主题也将被创建。
通过在 http://localhost:7474/browser/ 中的 Neo4j 浏览器中执行以下查询来验证是否确实如此。
MATCH (n:(Person | Family)) RETURN n
现在,您可以通过执行更多类似以下的语句来创建、更新或删除 Person 和 Family 节点
CREATE (:TestSource {name: 'Ann', surname: 'Bolin'});
验证是否创建并相互链接了新的 Person
和 Family
节点。
MATCH (n:TestSource {name: 'mary', surname: 'doe'}) SET n.surname = 'smith';
验证现有的 Person
节点现在是否已更新为 smith
的姓氏,并链接到新的 Family
节点。
MATCH (n:TestSource {name: 'mary', surname: 'smith'}) DELETE n;
验证现有的 Person
节点是否已删除。