在 Confluent Platform 上迁移
步骤
-
打开 Confluent Control Center 实例(地址为
http://localhost:9021/clusters
),找到已注册的源或目标连接器。找到后,将其删除。 -
查看
connect
容器的日志 (docker-compose logs -f connect
)。作为连接器删除/关闭的一部分,迁移后的配置将被打印到日志中。示例connect | [2024-09-04 09:18:40,066] INFO The migrated settings for 5.1 version of Neo4j Source Connector 'Neo4jSourceConnectorAVRO' is: `{ connect | "connector.class" : "org.neo4j.connectors.kafka.source.Neo4jConnector", connect | "neo4j.authentication.basic.password" : "", connect | "neo4j.uri" : "bolt://neo4j:7687", connect | "neo4j.query" : "MATCH (ts:TestSource) WHERE ts.timestamp > $lastCheck RETURN ts.name AS name, ts.surname AS surname, ts.timestamp AS timestamp", connect | "neo4j.query.streaming-property" : "timestamp", connect | "value.converter.schema.registry.url" : "http://schema-registry:8081", connect | "task.class" : "streams.kafka.connect.source.Neo4jSourceTask", connect | "neo4j.authentication.basic.username" : "neo4j", connect | "name" : "Neo4jSourceConnectorAVRO", connect | "neo4j.query.topic" : "my-topic", connect | "value.converter" : "io.confluent.connect.avro.AvroConverter", connect | "key.converter" : "io.confluent.connect.avro.AvroConverter", connect | "key.converter.schema.registry.url" : "http://schema-registry:8081", connect | "neo4j.query.poll-interval" : "5000ms", connect | "neo4j.start-from" : "USER_PROVIDED", connect | "neo4j.start-from.value" : 1725441505774 connect | }` (streams.kafka.connect.source.Neo4jSourceService)
-
创建一个新的 JSON 文件
(source/sink)_migrated.neo4j.json
,并将迁移后的示例配置连同连接器名称复制到此文件中。配置应包含一个name
属性,并且迁移后的配置应嵌套在config
键中。 -
验证新配置是否包含与之前版本的连接器配置相比所有相关的键。
-
按照《安装》指南下载新的连接器版本,并删除原始的 5.0.x 连接器版本。确保将新插件复制到 Docker 设置期间创建的
./plugins/
文件夹中。 -
通过运行
docker-compose restart connect
重启 Kafka Connect Worker。这使得 Kafka Connect 平台可以从./plugins/
文件夹中加载新插件。 -
继续遵循《Confluent Platform 快速入门》将插件部署到带有新配置的 Kafka Connect。
-
一旦连接器启动并运行,请验证它是否成功运行。