Confluent 平台迁移

本指南遵循 Confluent 平台快速入门.

步骤
  1. http://localhost:9021/clusters 打开 Confluent 控制中心实例并找到已注册的源或接收器连接器。找到后,将其删除。

  2. 查看 connect 容器的日志 (docker-compose logs -f connect)。迁移后的配置将作为连接器删除/关闭的一部分打印到日志中。例如

    connect  | [2024-09-04 09:18:40,066] INFO The migrated settings for 5.1 version of Neo4j Source Connector 'Neo4jSourceConnectorAVRO' is: `{
    connect  |   "connector.class" : "org.neo4j.connectors.kafka.source.Neo4jConnector",
    connect  |   "neo4j.authentication.basic.password" : "",
    connect  |   "neo4j.uri" : "bolt://neo4j:7687",
    connect  |   "neo4j.query" : "MATCH (ts:TestSource) WHERE ts.timestamp > $lastCheck RETURN ts.name AS name, ts.surname AS surname, ts.timestamp AS timestamp",
    connect  |   "neo4j.query.streaming-property" : "timestamp",
    connect  |   "value.converter.schema.registry.url" : "http://schema-registry:8081",
    connect  |   "task.class" : "streams.kafka.connect.source.Neo4jSourceTask",
    connect  |   "neo4j.authentication.basic.username" : "neo4j",
    connect  |   "name" : "Neo4jSourceConnectorAVRO",
    connect  |   "neo4j.query.topic" : "my-topic",
    connect  |   "value.converter" : "io.confluent.connect.avro.AvroConverter",
    connect  |   "key.converter" : "io.confluent.connect.avro.AvroConverter",
    connect  |   "key.converter.schema.registry.url" : "http://schema-registry:8081",
    connect  |   "neo4j.query.poll-interval" : "5000ms",
    connect  |   "neo4j.start-from" : "USER_PROVIDED",
    connect  |   "neo4j.start-from.value" : 1725441505774
    connect  | }` (streams.kafka.connect.source.Neo4jSourceService)
  3. 创建一个新的 JSON 文件 (source/sink)_migrated.neo4j.json 并将迁移后的示例配置复制到此文件中,以及连接器名称。该配置应包含一个 name 属性,迁移后的配置应嵌套在 config 键中。

  4. 验证新配置是否包含与连接器配置的先前版本相比的所有相关键。

    1. 有关新配置选项的描述和示例,请参见 源配置设置接收器配置设置

    2. 如果迁移源组件,请注意已设置为上次检查偏移量的 neo4j.start-from.value

    3. 替换敏感值。

      原始配置中的敏感值不会打印在迁移后的配置中。这些键需要用适当的值填充。受影响的配置键是 neo4j.authentication.basic.passwordneo4j.authentication.kerberos.ticket
  5. 按照 安装 指南下载新的连接器版本,并删除原始的 5.0.x 连接器版本。确保将新插件复制到 Docker 设置期间创建的 ./plugins/ 文件夹中。

  6. 通过运行 docker-compose restart connect 重新启动 Kafka Connect 工作器。这允许 Kafka Connect 平台从 ./plugins/ 文件夹中获取新的插件。

  7. 继续按照 Confluent 平台快速入门 将插件部署到 Kafka Connect,并使用新配置。

  8. 连接器启动并运行后,验证它是否成功运行。