在 Confluent Platform 上迁移

步骤
  1. 打开 Confluent Control Center 实例(地址为 http://localhost:9021/clusters),找到已注册的源或目标连接器。找到后,将其删除。

  2. 查看 connect 容器的日志 (docker-compose logs -f connect)。作为连接器删除/关闭的一部分,迁移后的配置将被打印到日志中。示例

    connect  | [2024-09-04 09:18:40,066] INFO The migrated settings for 5.1 version of Neo4j Source Connector 'Neo4jSourceConnectorAVRO' is: `{
    connect  |   "connector.class" : "org.neo4j.connectors.kafka.source.Neo4jConnector",
    connect  |   "neo4j.authentication.basic.password" : "",
    connect  |   "neo4j.uri" : "bolt://neo4j:7687",
    connect  |   "neo4j.query" : "MATCH (ts:TestSource) WHERE ts.timestamp > $lastCheck RETURN ts.name AS name, ts.surname AS surname, ts.timestamp AS timestamp",
    connect  |   "neo4j.query.streaming-property" : "timestamp",
    connect  |   "value.converter.schema.registry.url" : "http://schema-registry:8081",
    connect  |   "task.class" : "streams.kafka.connect.source.Neo4jSourceTask",
    connect  |   "neo4j.authentication.basic.username" : "neo4j",
    connect  |   "name" : "Neo4jSourceConnectorAVRO",
    connect  |   "neo4j.query.topic" : "my-topic",
    connect  |   "value.converter" : "io.confluent.connect.avro.AvroConverter",
    connect  |   "key.converter" : "io.confluent.connect.avro.AvroConverter",
    connect  |   "key.converter.schema.registry.url" : "http://schema-registry:8081",
    connect  |   "neo4j.query.poll-interval" : "5000ms",
    connect  |   "neo4j.start-from" : "USER_PROVIDED",
    connect  |   "neo4j.start-from.value" : 1725441505774
    connect  | }` (streams.kafka.connect.source.Neo4jSourceService)
  3. 创建一个新的 JSON 文件 (source/sink)_migrated.neo4j.json,并将迁移后的示例配置连同连接器名称复制到此文件中。配置应包含一个 name 属性,并且迁移后的配置应嵌套在 config 键中。

  4. 验证新配置是否包含与之前版本的连接器配置相比所有相关的键。

    1. 有关新配置选项的说明和示例,请参阅《源配置设置》《目标配置设置》

    2. 如果迁移源组件,请注意 neo4j.start-from.value 已设置为最后检查的偏移量。

    3. 替换敏感值。

      原始配置中的敏感值不会打印在迁移后的配置中。这些键需要填入适当的值。受影响的配置键是 neo4j.authentication.basic.passwordneo4j.authentication.kerberos.ticket
  5. 按照《安装》指南下载新的连接器版本,并删除原始的 5.0.x 连接器版本。确保将新插件复制到 Docker 设置期间创建的 ./plugins/ 文件夹中。

  6. 通过运行 docker-compose restart connect 重启 Kafka Connect Worker。这使得 Kafka Connect 平台可以从 ./plugins/ 文件夹中加载新插件。

  7. 继续遵循《Confluent Platform 快速入门》将插件部署到带有新配置的 Kafka Connect。

  8. 一旦连接器启动并运行,请验证它是否成功运行。

© . All rights reserved.